springboot kafka在kafka server AUTH变动后consumer自动销毁
前言
笔者使用了kafka用来传输数据,笔者在今年10月写了文章,怎么使用配置化实现kafka的装载:springboot kafka多数据源,通过配置动态加载发送者和消费者-CSDN博客
不过在实际运行中,kafka broker是加密的,本来也没啥,但是突然的一天笔者在监控发现消费者掉线了,发送者居然还是正常的,见鬼的事情就是这么朴实的发生了,而且日志有Authentication/Authorization Exception and no authExceptionRetryInterval set
这样的错误,还有
Closing the Kafka consumer Kafka consumer has been closed
这样的日志,非常诡异,后面查询kafka集群才知道,kafka集群在某个时间被改了AUTH配置,然后又改回来了,神奇的操作。
现象
实际上看到日志是懵的,毕竟kafka是对方提供的,AUTH用户名和密码也是对方给的,而且发送者也出现AUTH失败,但是发送者一直重试,然后因为kafka集群的AUTH改回来了,重试成功了;唯独消费者AUTH失败后,关闭了。
源码分析
从日志搜索Authentication/Authorization Exception and no authExceptionRetryInterval set
发现日志出现在
org.springframework.kafka.listener.KafkaMessageListenerContainer
刚好在消息监听器容器的内部类
ListenerConsumer
看源码定义,是一个定时线程池的任务定义
private final class ListenerConsumer implements SchedulingAwareRunnable, ConsumerSeekCallback
分析怎么去消费的KafkaMessageListenerContainer的doStart方法
既然明晰了,那么分析问题的来源, run方法
public void run() {ListenerUtils.setLogOnlyMetadata(this.containerProperties.isOnlyLogRecordMetadata());publishConsumerStartingEvent(); //事件通知,spring事件this.consumerThread = Thread.currentThread();setupSeeks();KafkaUtils.setConsumerGroupId(this.consumerGroupId);this.count = 0;this.last = System.currentTimeMillis();initAssignedPartitions(); // 初始分配partitionspublishConsumerStartedEvent(); // 消费者启动事件Throwable exitThrowable = null;this.lastReceive = System.currentTimeMillis();while (isRunning()) { //状态在上面的截图代码已经更新为运行状态try {pollAndInvoke(); // 队列拉取,拉取过程会出现各种异常}catch (NoOffsetForPartitionException nofpe) { //这个需要注意,但是这个是不可配置的this.fatalError = true;ListenerConsumer.this.logger.error(nofpe, "No offset and no reset policy");exitThrowable = nofpe;break;}catch (AuthenticationException | AuthorizationException ae) { //这个是授权失败,就是来源于kafka broker的auth鉴权if (this.authExceptionRetryInterval == null) { //时间配置,默认居然是nullListenerConsumer.this.logger.error(ae,"Authentication/Authorization Exception and no authExceptionRetryInterval set");this.fatalError = true;exitThrowable = ae;break; //循环结束}else { // 一段时间后重试ListenerConsumer.this.logger.error(ae,"Authentication/Authorization Exception, retrying in "+ this.authExceptionRetryInterval.toMillis() + " ms");// We can't pause/resume here, as KafkaConsumer doesn't take pausing// into account when committing, hence risk of being flooded with// GroupAuthorizationExceptions.// see: https://github.com/spring-projects/spring-kafka/pull/1337sleepFor(this.authExceptionRetryInterval);}}catch (FencedInstanceIdException fie) {this.fatalError = true;ListenerConsumer.this.logger.error(fie, "'" + ConsumerConfig.GROUP_INSTANCE_ID_CONFIG+ "' has been fenced");exitThrowable = fie;break;}catch (StopAfterFenceException e) {this.logger.error(e, "Stopping container due to fencing");stop(false);exitThrowable = e;}catch (Error e) { // NOSONAR - rethrownthis.logger.error(e, "Stopping container due to an Error");this.fatalError = true;wrapUp(e);throw e;}catch (Exception e) {handleConsumerException(e);}finally {clearThreadState();}}//上面异常后,这里的处理wrapUp(exitThrowable);}
注意NoOffsetForPartitionException和AuthenticationException | AuthorizationException,其中能配置重试的是AuthenticationException | AuthorizationException异常,就是授权失败可以通过配置过一段时间抢救一下,一般而言,kafka首次授权失败基本上就不太可能成功了,但是这个只能控制consumer销毁,producer还在重试,所以出现了消费者销毁了,发送者在kafka集群auth还原后成功恢复。看看wrapUp方法
既然知道了原理,那么解决办法是配置
authExceptionRetryInterval
解决方法
配置authExceptionRetryInterval也是不容易,分析取值
private final Duration authExceptionRetryInterval =this.containerProperties.getAuthExceptionRetryInterval();
从 containerProperties来的,实际上是继承org.springframework.kafka.listener.ConsumerProperties
在spring-kafka 2.8版本还对属性命名重构了,毕竟以前的命名字母太多了,😁
不过要修改这个值也不容易,kafkaproperties并没有提供这个参数,而且创建消费者容器工厂时
org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory
那么只能在
ConcurrentMessageListenerContainer
创建成功后,从这个里面读取配置,设置默认值。这就可以使用前面埋点的spring事件了,通过事件拿到Consumer,不就可以修改配置了
使用
publishConsumerStartedEvent();
的事件最合适,执行循环前最后一个事件,而且这里的this,就是消费者容器org.springframework.kafka.listener.KafkaMessageListenerContainer对象,就是我们需要的
编写代码如下:
@Component
public class KafkaConsumerStartedListener implements ApplicationListener<ConsumerStartedEvent> {@Overridepublic void onApplicationEvent(ConsumerStartedEvent event) {KafkaMessageListenerContainer<?, ?> container = event.getSource(KafkaMessageListenerContainer.class);container.getContainerProperties().setAuthExceptionRetryInterval(Duration.of(30, ChronoUnit.SECONDS));}
}
就这样就解决了问题,哈哈哈贼简单。
笔者在使用API时发现了还有一个API是授权失败后重启,是布尔变量,默认值是false,即不重试。那么这个是哪里触发的呢,在我们看到的消费者销毁事件里面,实际上控制重试有多种办法,一个是循环去kafka broker拉取,一个是重启kafka消费者容器。
这个在上面已经说明了,简单截个图再说明一下
设置的地方还是上面的代码里面,同理也可以修改事件为
ConsumerStoppedEvent
可以更精准,当然就用刚刚的代码加一行设置也是可以的。
总结
kafka在发送者和消费者是区分开的,发送者如果连接kafka broker失败后可以一直重试直到成功,但是消费者确有各种各样的逻辑,可以精准控制,比如消费者重启的配置
restartAfterAuthExceptions
可以控制消费者在停止时重启,如果仅仅是授权失败,而且不需要反复重启(消耗资源),那么可以通过
authExceptionRetryInterval
配置时间周期的方式实现,但是kafka并没有给我们配置的入口,但是kafka在消费者启动消费的过程埋了很多spring事件钩子,通过这些钩子可以操作,估计spring-kafka也不希望我们去修改,毕竟消费者启动失败了或授权失败了,消费者自动销毁是符合正常逻辑的。如果不使用kafka自己提供的事件,可以在启动完成通过
org.springframework.kafka.config.KafkaListenerEndpointRegistry
拿到所有消费者容器,来批量设置属性,毕竟spring-kafka也是通过这个端点注册器注册MessageListenerContainer的。
相关文章:
springboot kafka在kafka server AUTH变动后consumer自动销毁
前言 笔者使用了kafka用来传输数据,笔者在今年10月写了文章,怎么使用配置化实现kafka的装载:springboot kafka多数据源,通过配置动态加载发送者和消费者-CSDN博客 不过在实际运行中,kafka broker是加密的,…...
第六届国际科技创新(IAECST 2024)暨第四届物流系统与交通运输(LSTT 2024)
重要信息 会议官网:www.lstt.org 大会时间:2024年12月6-8日 大会地点:中国-广州 简介 第六届国际科技创新暨第四届物流系统与交通运输国际(LSTT 2024)将于2024年12月6-8日在广州举办,这是一个集中探讨…...
【Vue3】【Naive UI】< a >标签
【Vue3】【Naive UI】< a >标签 超链接及相关属性其他属性 【VUE3】【Naive UI】<NCard> 标签 【VUE3】【Naive UI】<n-button> 标签 【VUE3】【Naive UI】<a> 标签 <a> 标签HTML中的一个锚&…...
Fortran mpi在Linux的安装
最近编译一个程序需要需要 Fortran mpi 编译器,则需要安装 Fortran编辑器和MPI库,以下是具体的安装步骤: 一、安装 Fortran 编译器(gfortran) 在conda环境中安装: conda install -c conda-forge gfortra…...
蓝桥-希尔排序模板题
第一眼看到这个题还在想希尔排序模板不记得了,于是去网上了搜了一个,但是考虑到这种题只看测试点能不能通过,于是用Arrays方法试了一下,发现也可以。 1.希尔排序模板ac代码 package yunkePra;import java.util.Scanner;public cl…...
深入学习指针(5)!!!!!!!!!!!!!!!
文章目录 1.回调函数是什么?2.qsort使用举例2.1使用qsort函数排序整形数据2.2使用sqort排序结构数据 3.qsort函数的模拟实现 1.回调函数是什么? 回调函数就是⼀个通过函数指针调⽤的函数。 如果你把函数的指针(地址)作为参数传递…...
windows 应用 UI 自动化实战
UI 自动化技术架构选型 UI 自动化是软件测试过程中的重要一环,网络上也有很多 UI 自动化相关的知识或资料,具体到 windows 端的 UI 自动化,我们需要从以下几个方面考虑: 开发语言 毋庸置疑,在 UI 自动化测试领域&am…...
nodejs相关知识介绍
1、nodejs官方文档: https://nodejs.org/zh-cn nodejs可以用nvm进入安装; 2、npm说明: npm官方教程:https://npm.p2hp.com/ npm是 Node.js 的标准包管理器,也就是说nodejs安装好,npm也就安装好了&#…...
How to monitor Spring Boot apps with the AppDynamics Java Agent
本文介绍如何使用 AppDynamics Java 代理监视 Azure Spring Apps 中的 Spring Boot 应用程序。 使用 AppDynamics Java 代理可以: 监视应用程序使用环境变量配置 AppDynamics Java 代理 在 AppDynamics 仪表板中检查所有监视数据 How to monitor Spring Boot app…...
安装SQL Server 2022提示需要Microsoft .NET Framework 4.7.2 或更高版本
安装SQL Server 2022提示需要Microsoft .NET Framework 4.7.2 或更高版本。 原因是:当前操作系统版本为Windows Server 2016 Standard版本,其自带的Microsoft .NET Framework 版本为4.6太低,不满足要求。 根据报错的提示,点击链接…...
TypeScript核心语法(5)——函数
简介 函数的类型声明,需要在声明函数时,给出参数的类型和返回值的类型。 function hello(a: string): void {console.log("hello " txt); } 上面示例中,函数hello()在声明时,需要给出参数a的类型(stri…...
【MyBatis】验证多级缓存及 Cache Aside 模式的应用
文章目录 前言1. 多级缓存的概念1.1 CPU 多级缓存1.2 MyBatis 多级缓存 2. MyBatis 本地缓存3. MyBatis 全局缓存3.1 MyBatis 全局缓存过期算法3.2 CacheAside 模式 后记MyBatis 提供了缓存切口, 采用 Redis 会引入什么问题?万一遇到需强一致场景&#x…...
ARIMA-神经网络混合模型在时间序列预测中的应用
ARIMA-神经网络混合模型在时间序列预测中的应用 1. 引言 1.1 研究背景与意义 时间序列预测在现代数据科学中扮演着越来越重要的角色。从金融市场的价格走势到工业生产的需求预测,从气象数据的天气预报到用电量的负荷预测,时间序列分析无处不在。传统的统计方法和现代深度学习…...
Scala关于成绩的常规操作
score.txt中的数据: 姓名,语文,数学,英语 张伟,87,92,88 李娜,90,85,95 王强,78,90,82 赵敏,92,8…...
【Maven】项目创建
3. Maven的应用 本章主要内容: 使用 Maven 创建 JavaSE 项目使用 Maven 创建 JavaWeb 项目,在本地部署 Tomcat 测试导入 Maven 项目 3.1 基于Maven开发JavaSE的项目 3.1.1 流程 1、File—>new—>Project—>Empty Project Location࿱…...
基于 LlamaFactory 的 LoRA 微调模型支持 vllm 批量推理的实现
背景 LlamaFactory 的 LoRA 微调功能非常便捷,微调后的模型,没有直接支持 vllm 推理,故导致推理速度不够快。 LlamaFactory 目前支持通过 VLLM API 进行部署,调用 API 时的响应速度,仍然没有vllm批量推理的速度快。 …...
Vue进阶之单组件开发与组件通信
书接上篇,我们了解了如何快速创建一个脚手架,现在我们来学习如何基于vite创建属于自己的脚手架。在创建一个新的组件时,要在新建文件夹中打开终端创建一个基本的脚手架,可在脚手架中原有的文件中修改或在相应路径重新创建…...
HCIE IGP双栈综合实验
实验拓扑 实验需求及解法 本实验模拟ISP网络结构,R1/2组成国家骨干网,R3/4组成省级网络,R5/6/7组成数据中 心网络。 配置所有ipv4地址,请自行测试直连。 R1 sysname R1 interface GigabitEthernet0/0/0ip address 12.1.1.1 255.…...
Unity 超链接文本类
注:该脚本在文本显示不全时会有问题。 HyperlinkText.cs using System; using System.Text; using System.Collections.Generic; using System.Text.RegularExpressions; using UnityEngine; using UnityEngine.UI; using UnityEngine.EventSystems;namespace MYT…...
Vim小白学习指南
博客 Vim编辑器简介 Vim是一个非常高效的文本编辑器,最初源于Vi编辑器。它以其强大的文本编辑能力和快捷键而闻名于程序员和系统管理员。Vim的特别之处在于它提供了多种模式,每种模式都有不同的功能。 Vim的基本模式 1. 普通模式(Normal …...
【微服务】Nacos配置管理
一、统一配置管理 1、配置统一管理 2、微服务获取配置 ①引入Nacos的配置管理客户端依赖(usersevice下) <!--nacos的配置管理依赖--><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-nacos-confi…...
从单机缓存到分布式缓存那些事
作者:秦怀 1 缓存前世今生 1.1 故事从硬件开始 Cache 一词来源于 1967 年的一篇电子工程期刊论文。其作者将法语词“cache”赋予“safekeeping storage”的涵义,用于电脑工程领域。当时没有 Cache,CPU 和内存都很慢,CPU 直接访…...
华为新手机和支付宝碰一下 带来更便捷支付体验
支付正在变的更简单。 11月26日,华为新品发布会引起众多关注。发布会上,华为常务董事余承东专门提到,华为Mate 70和Mate X6折叠屏手机的“独门支付秘技”——“碰一下”,并且表示经过华为和支付宝的共同优化,使用“碰…...
element ui select绑定的值是对象的属性时,显示异常.
需要声明 value-key"value",如果还不行可能是数据类型不一致数字0和字符串0是不一致的. el-select v-model"value" clearable placeholder"Select" value-key"value" style"width: 240px"><!-- <el-option v-for&…...
基于Springboot开发的时光兼职网
一、功能介绍 时光兼职网包含管理员、用户、商家三个角色以及前后台系统。 前台系统功能 首页、兼职信息推荐、查看更多等 职位申请、申请日期、上传简历、点击下载简历、留言反馈等 个人中心、上传图片、更新信息等 后台系统功能 用户登录: 个人中心、修改密码…...
Vue3 Ts 如何获取组件的类型
vue3 Ts ref 子组件 1、默认写法 typeof:获取ts类型 InstanceType:获取模版的实例 <tempolate><myComponent ref"myCompRef"> </tempolate><script setup lang"ts"> import { ref } from "vue&quo…...
Unity类银河战士恶魔城学习总结(P146 Delete Save file-P147 Encryption of save data删除数据和加密数据)
【Unity教程】从0编程制作类银河恶魔城游戏_哔哩哔哩_bilibili 教程源地址:https://www.udemy.com/course/2d-rpg-alexdev/ 本章节实现了快速删除存档和加密存档 以下是加密前和加密后的对比 SaveManager.cs using System.Collections; using System.Collection…...
Uniapp 使用自定义字体
技术栈:Uniapp 简介 为了更好的还原UI图片效果,往往需要使用特殊字体,引入字体包。 因实际业务运行平台在微信小程序上,对发布包的项目文件大小有限制,项目中某些比较大的静态资源需要放在服务器上来远程加载&#x…...
Scala
统计成绩练习 1.计算每个同学的总分和平均分 2.统计每个科目的平均分 3.列出总分前三名和单科前三名,并保存结果到文件中 解题思路如下: 1.读入txt文件,按行读入 2.处理数据 (1)计算每个同学的总分…...
fnOS中安装HAOS,集成haier
只作为自己记录重要事项,不做详细教程。大致流程 安装飞牛OS,简称fnosfnos中有集成Docker在docker中安装haos在haos中安装hacs在hacs中添加haier 在docker中安装haos 安装好fnos后,docker里面找到haos,里面下载最多的,…...
基于群晖搭建个人图书架-TaleBook based on Docker
前言 在群晖Container Manager中部署失败,转通过ssh部署。 一、准备工作 名称备注群晖SSH“终端机和SNMP”中启用SSH软件secureCRT等docker-compose.ymlGithub下载并修改 二、过程 2.1 创建本地文件夹 本地路径为: /docker/Calibre/data 2.2 下载d…...
spring导出多个文件,要求打包成压缩包
背景 业务要求我们批量生成一批excel,并将这些excel压缩成一个压缩包导出给前端。 实现 java自带了ZipOutputStream,可以直接生成压缩包,因此,我们直接使用这个,在内存中生成压缩包,直接返回给前端。&am…...
Vue 3中实现多个自定义组件之间的切换
在 Vue 3 中,如果你想在 HTML 页面中实现多个自定义组件之间的切换,你可以使用 Vue 的条件渲染功能,比如 v-if、v-else-if 和 v-else 指令,或者使用 <component> 标签结合 is 属性来动态绑定组件。 1. 打开HBuilder X 图1 …...
opengl 三角形
最后效果: OpenGL version: 4.1 Metal 不知道为啥必须使用VAO 才行。 #include <glad/glad.h> #include <GLFW/glfw3.h>#include <iostream> #include <vector>void framebuffer_size_callback(GLFWwindow *window, int width, int heigh…...
shell脚本练习(2)
1. 使用case实现成绩优良差的判断 2. for创建20用户 用户前缀由用户输入 用户初始密码由用户输入 例如:test01,test10 3. for ping测试指网段的主机 网段由用户输入,例如用户输入192.168.2 ,则ping 192.168.2.10 --- 192.168.2.2…...
JS数组的一些方法
前言 忘了在哪里听说JS是用来处理各种各样的数据的,所以掌握一些数组的处理方法极其重要 而最近学校要进行测试,本着复习回顾的想法,决定将一些我所知道的数组处理方法整理整理 不过难免有遗漏与错误,还望各位大佬指正 forEac…...
学习threejs,使用CubeCamera相机创建反光效果
👨⚕️ 主页: gis分享者 👨⚕️ 感谢各位大佬 点赞👍 收藏⭐ 留言📝 加关注✅! 👨⚕️ 收录于专栏:threejs gis工程师 文章目录 一、🍀前言1.1 ☘️CubeCamera 立方体相机 二、…...
#渗透测试#红蓝攻防#HW#漏洞挖掘#漏洞复现01-笑脸漏洞(vsftpd)
免责声明 本教程仅为合法的教学目的而准备,严禁用于任何形式的违法犯罪活动及其他商业行为,在使用本教程前,您应确保该行为符合当地的法律法规,继续阅读即表示您需自行承担所有操作的后果,如有异议,请立即停…...
使用nginx请求转发时前端报跨域问题解决
当其他接口都没有问题,后端也进行了跨域的配置时,此时问题应该就出现在nginx中 我发现当上传文件大小小于1m时并不会发生错误,所以我们应该配置一下nginx允许上传文件的大小 在nginx.conf中添加 在nginx目录下重启nginx即可 (Wi…...
贪心算法入门(二)
第1题 越野跑 查看测评数据信息 为了能在下一次跑步比赛中有好的发挥,桐桐在一条山路上开始了她的训练 。桐桐希望能在每次训练中跑得尽可能远,不过她也知道农场中的一条规定:独自进山的时间不得超过M秒(1 < M < 10,000,000)。 整条…...
欧拉函数——acwing
题目一:欧拉函数 873. 欧拉函数 - AcWing题库 分析(欧拉函数相关知识点) 互质数不了解可以参考之前笔记,以便更好了解: 数论—快速幂,欧几里得及其扩展,逆元,单位元_数论单位元函…...
Spring集成Mybatis的实现
实现步骤大纲 第一步:准备数据库表 使用t_act表(账户表) 第二步:IDEA中创建一个模块,并引入依赖 spring-contextspring-jdbcmysql驱动mybatismybatis-spring:mybatis提供的与spring框架集成的依赖德鲁伊连…...
Redis中的分布式锁(步步为营)
分布式锁 概述 分布式锁指的是,所有服务中的所有线程都去获取同一把锁,但只有一个线程可以成功的获得锁,其他没有获得锁的线程必须全部等待,直到持有锁的线程释放锁。 分布式锁是可以跨越多个实例,多个进程的锁 分布…...
Linux下的三种 IO 复用
目录 一、Select 1、函数 API 2、使用限制 3、使用 Demo 二、Poll 三、epoll 0、 实现原理 1、函数 API 2、简单代码模板 3、LT/ET 使用过程 (1)LT 水平触发 (2)ET边沿触发 4、使用 Demo 四、参考链接 一、Select 在…...
微服务即时通讯系统的实现(服务端)----(2)
目录 1. 语音识别子服务的实现1.1 功能设计1.2 模块划分1.3 模块功能示意图1.4 接口的实现 2. 文件存储子服务的实现2.1 功能设计2.2 模块划分2.3 模块功能示意图2.4 接口的实现 3. 用户管理子服务的实现3.1 功能设计3.2 模块划分3.3 功能模块示意图3.4 数据管理3.4.1 关系数据…...
数据库原理-期末复习基础知识第二弹
1、数据的逻辑独立性是指 外模式/模式映像 当模式改变的时候,由数据库管理员对各个外模式/模式的映像做出相应改变,使外模式保持不变。由于应用程序是按照外模式进行编写的,故应用程序不必修改,保证了数据与程序的逻辑独立性。 …...
智能云在线编辑网站(完结篇)
开始及初步计划 1.前端tiptip编辑器框架vue3 2.后端Pythonflaskmysql 3.大模型调用:飞桨系列(ppasr) 前言:以此篇谨记从软件杯到天津生成式ai答辩过程及结束。 『如蚍蜉见青天,双肩难挑日月』,感叹世事多…...
多源传感器构建机器人的Gazebo模型
构建包含GNSS、IMU、LiDAR、Camera传感器的Gazebo模型涉及多个步骤,包括设置工作环境、创建URDF文件、安装必要的Gazebo插件和依赖项。以下是一个详细的步骤指南,帮助你开始构建这个Gazebo模型。 1. 设置工作环境 首先,确保你已经安装了ROS…...
linux中top 命令返回数据解释
当您在 Linux 终端中运行 top 命令时,它会显示一个动态更新的系统状态视图,其中包括许多有关系统性能的数据。下面是对 top 命令返回数据的详细解释: 标题栏 top - 22:46:12 up 2 days, 3:14, 1 user, load average: 0.05, 0.07, 0.09 22:46:12:当前时间。up 2 days, 3:14…...
【Vue3】【Naive UI】<NDropdown>标签
【Vue3】【Naive UI】 标签 基本设置自定义渲染交互事件其他属性 【VUE3】【Naive UI】<NCard> 标签 【VUE3】【Naive UI】<n-button> 标签 【VUE3】【Naive UI】<a> 标签 【VUE3】【Naive UI】<…...