004 Kafka异常处理
6.异常处理
文章目录
- 6.异常处理
- 1.异常分类与处理原则
- 2.生产者异常处理
- 1. 同步发送捕获异常
- 2. 异步发送回调处理
- 3.消费者异常处理
- 1.全局异常处理器
- 2.方法级处理
- 3.重试yml配置
- 4.死信队列(DLQ)配置
- 1. 启用死信队列
- 2. 手动发送到DLQ
- 5.事务场景异常处理
- 1. 声明式事务
- 2. 事务异常回滚
- 6.监控与告警
- 1. Actuator 健康检查
- 2. Prometheus 指标
- 7.完整异常处理流程
- 8.最佳实践总结
来源参考的deepseek,如有侵权联系立删
1.异常分类与处理原则
异常类型 | 典型场景 | 处理建议 |
---|---|---|
可恢复异常 | 网络抖动、数据库锁冲突 | 重试机制(有限次数 + 退避策略) |
不可恢复异常 | 消息格式错误、权限不足 | 直接记录日志并进入死信队列 |
事务异常 | 事务超时、生产者ID冲突 | 终止事务并回滚操作 |
2.生产者异常处理
1. 同步发送捕获异常
public void sendSync(String topic, String message) {try {ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);future.get(5, TimeUnit.SECONDS); // 阻塞等待结果} catch (InterruptedException | ExecutionException | TimeoutException e) {// 记录日志并触发补偿逻辑log.error("消息发送失败: {}", e.getMessage());throw new BusinessException("消息发送失败", e);}
}
2. 异步发送回调处理
public void sendAsync(String topic, String message) {kafkaTemplate.send(topic, message).addCallback(result -> {// 发送成功处理log.info("消息发送成功: topic={}", result.getRecordMetadata().topic());},ex -> {// 发送失败处理log.error("消息发送失败", ex);if (ex instanceof RetriableException) {// 可重试异常(如网络问题)retrySend(topic, message);} else {// 不可重试异常(如消息过大)deadLetterService.saveToDlq(topic, message);}});
}
3.消费者异常处理
1.全局异常处理器
@Configuration
public class KafkaGlobalErrorConfig {// 定义全局错误处理器(支持批量/单消息模式)@Beanpublic CommonErrorHandler globalErrorHandler(KafkaTemplate<String, Object> template) {// 重试策略:3次重试,间隔5秒DefaultErrorHandler handler = new DefaultErrorHandler(new DeadLetterPublishingRecoverer(template), // 死信队列恢复器new FixedBackOff(5000L, 3));// 指定可重试异常类型handler.addRetryableExceptions(NetworkException.class);handler.addNotRetryableExceptions(SerializationException.class);// 偏移量提交策略handler.setCommitRecovered(true);return handler;}// 容器工厂绑定全局处理器@Beanpublic ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory(ConsumerFactory<String, Object> consumerFactory,CommonErrorHandler globalErrorHandler) {ConcurrentKafkaListenerContainerFactory<String, Object> factory =new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory);factory.setCommonErrorHandler(globalErrorHandler);return factory;}
}
2.方法级处理
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.listener.ConsumerAwareListenerErrorHandler;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.serializer.DeserializationException;@Slf4j
@Configuration
public class KafkaExceptionConfig {/*** 自定义异常处理器*/@Beanpublic ConsumerAwareListenerErrorHandler orderErrorHandler() {return (message, exception, consumer) -> {// 业务相关错误处理(如库存不足)/* if (exception instanceof InventoryException) {retryService.scheduleRetry(message.getPayload());}*/System.out.println("异常执行:"+exception);return null;};}/*** 注册全局异常处理器*/@Beanpublic ConsumerAwareListenerErrorHandler globalExceptionHandler() {return (message, exception, consumer) -> {log.error("捕获消费异常: topic={}, message={}",message.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC),message.getPayload(),exception);// 反序列化异常特殊处理if (exception.getCause() instanceof DeserializationException) {// 跳过错消息并提交偏移量return null;}throw exception; // 其他异常继续抛出};}}
@KafkaListener(topics = "test", groupId = "spring-group",errorHandler = "globalExceptionHandler")public void listenBatch(List<String> messages, Acknowledgment ack) {messages.forEach(msg -> System.out.println("批量消息:" + msg));//异常测试int i = 1/0;ack.acknowledge();}
3.重试yml配置
spring:kafka:listener:retry:max-attempts: 3 # 最大重试次数backoff:initial-interval: 1000 # 初始间隔(毫秒)multiplier: 2.0 # 间隔倍数exclude-exceptions: # 不重试的异常- javax.validation.ValidationException
4.死信队列(DLQ)配置
1. 启用死信队列
spring:kafka:listener:dead-letter-publish:enable: true # 自动发布到死信队列dead-letter-topic: dlq-${topic} # 死信队列命名规则
2. 手动发送到DLQ
@KafkaListener(topics = "payments")
public void handlePayment(PaymentEvent event, Acknowledgment ack) {try {paymentService.process(event);ack.acknowledge();} catch (InvalidPaymentException ex) {// 手动发送到DLQkafkaTemplate.send("dlq-payments", event);ack.acknowledge(); // 避免重复消费}
}
5.事务场景异常处理
1. 声明式事务
@Transactional
public void processWithTransaction(Order order) {// 数据库操作orderRepository.save(order);// Kafka事务消息kafkaTemplate.send("orders", order.toEvent());// 其他业务...
}
2. 事务异常回滚
@Bean
public KafkaTransactionManager<String, Object> transactionManager(ProducerFactory<String, Object> pf) {return new KafkaTransactionManager<>(pf);
}@Transactional(rollbackFor = {KafkaException.class, SQLException.class})
public void transactionalProcess() {// 数据库与Kafka操作
}
6.监控与告警
1. Actuator 健康检查
management:endpoints:web:exposure:include: health,kafkahealth:kafka:enabled: true
2. Prometheus 指标
@Bean
public MicrometerConsumerListener<K, V> consumerMetrics() {return new MicrometerConsumerListener<>("kafka.consumer");
}@Bean
public MicrometerProducerListener<K, V> producerMetrics() {return new MicrometerProducerListener<>("kafka.producer");
}
7.完整异常处理流程
- 捕获异常 → 2. 分类判断 → 3. 重试/记录/DLQ → 4. 提交Offset → 5. 监控告警
8.最佳实践总结
- 分层处理:全局处理器兜底 + 方法级精细控制
- 幂等消费:确保消息重复消费时的数据安全性
- 监控覆盖:跟踪重试次数、DLQ堆积等关键指标
- 事务隔离:
@Transactional
+read_committed
保证数据一致性
相关文章:
004 Kafka异常处理
6.异常处理 文章目录 6.异常处理1.异常分类与处理原则2.生产者异常处理1. 同步发送捕获异常2. 异步发送回调处理 3.消费者异常处理1.全局异常处理器2.方法级处理3.重试yml配置 4.死信队列(DLQ)配置1. 启用死信队列2. 手动发送到DLQ 5.事务场景异常处理1.…...
C++模拟实现map和set
C模拟实现map和set 1、STL源代码分析2、实现出复用红黑树的框架3、实现红黑树的迭代器4、解决map和set中key不可修改问题5、解决insert返回值问题完整代码 模拟实现map和set实际上是对红黑树的封装,如对红黑树的实现有疑问,请移步:C手撕红黑树…...
使用elasticdump导出/导入 -- ES数据
导出指定索引数据到指定文件夹: ./elasticdump --inputhttp://用户:密码IP:9201/索引名字 --output导出路径/out.json --typedata 将导出的文件导入 ./elasticdump --input路径/out.json --outputhttp://账号:密码IP:9201/索引名称 --typedata --fileTypejson 【el…...
CSDN年度评选揭晓,永洪科技AI技术与智能应用双星闪耀
近日,永洪科技在CSDN(中国专业开发者社区)的年度评选中,凭借在人工智能技术创新与vividime在行业应用中的卓越表现,一举斩获“人工智能企业”及“智能应用”双料大奖。这一荣誉不仅彰显了永洪科技在AI领域的领先地位&a…...
Kubernetes 资源利用率翻倍?离在线混合部署深度解析
还在为 Kubernetes 集群资源利用率低而烦恼?还在为高昂的云成本而头疼?今天,我们就来聊聊 Kubernetes 中的一项黑科技——离在线混合部署,让大家的集群资源利用率翻倍,成本减半! 🤔 什么是离在线…...
【Java】Spring Boot全量YAML配置说明
目录 Spring Boot 配置文件基础核心配置日志配置Web 服务器配置数据源配置JPA 配置缓存配置国际化配置邮件服务配置自定义配置使用示例1. Spring Boot 配置文件基础 Spring Boot 的配置文件可以使用以下文件格式: application.propertiesapplication.ymlSpring Boot 默认加载…...
【STL】7.STL常用算法(1)
STL常用算法(1) 前言简介一.遍历算法1.for_each2.transform 二.查找算法1.find2.find_if3.adjacent_find4.binary_search5.count6.cout_if 三.排序算法1.sort2.random_shuffle3.merge4.reverse 总结 前言 stl系列主要讲述有关stl的文章,使用S…...
弱监督语义分割学习计划(1)-简单实现CAM但是效果不好
零: 项目说明 是这样的一个事情,经过与deepseek的一番讨论和交流,DeepSeek为我设计了一个30天高强度学习计划,重点聚焦弱监督/无监督语义分割在野外场景的应用,结合理论与实践,并最终导向可落地的开源项目。目前开始了…...
内存泄漏问题分享
在前端开发中,内存泄漏(Memory Leak)是指由于代码问题导致浏览器无法回收不再使用的内存,从而影响网页的性能,导致页面变慢,甚至崩溃。前端内存泄漏通常由以下几种原因引起,理解和修复这些问题对…...
用 DeepSeek 打样!KubeSphere LuBan 用 3 天/3 分钟“干掉”大模型部署焦虑
用 DeepSeek 打样!KubeSphere LuBan 用 3 天/3 分钟“干掉”大模型部署焦虑 大模型落地,如何告别“部署焦虑”? DeepSeek-R1 的惊艳表现无需赘述,但企业落地时的高门槛却让许多开发者望而却步——复杂的部署流程、资源调度难题、…...
Java在云计算平台中的应用研究
Java在云计算平台中的应用研究 随着云计算的广泛应用,越来越多的企业和开发者开始选择基于云计算的架构来构建和部署应用。Java作为一种成熟的编程语言,凭借其跨平台性、强大的生态系统以及优秀的并发处理能力,已成为云计算平台中常用的编程…...
【学写LibreCAD】0 仿写LibreCAD简介
一、LibreCAD 核心模块: 核心模块(Core) 功能:处理 CAD 的核心逻辑,如几何计算、图形对象管理、坐标系转换等。关键组件: 图形对象:如直线、圆、圆弧、多段线等。数学工具:向量、矩…...
【质量管理】怎么评估职能部门当前质量管理成熟度
评估目的 在做质量管理时,我们需要先了解各职能部门当前质量管理成熟度。从而识别改进机会,为各职能部门后续质量提升计划提供依据。 直白说:就是让那些不肯动的人动起来,同时往往总经理对各部门的质量管理成熟度的评分要更低&…...
音乐游戏Dance Dance Revolution(DDR)模拟器
文章目录 (一)Dance Dance Revolution(1.1)基本情况(1.2)机体 (二)模拟器(2.1)主程序(2.2)模拟器主题 (三)曲谱…...
【Pandas】pandas Series filter
Pandas2.2 Series Computations descriptive stats 方法描述Series.align(other[, join, axis, level, …])用于将两个 Series 对齐,使其具有相同的索引Series.case_when(caselist)用于根据条件列表对 Series 中的元素进行条件判断并返回相应的值Series.drop([lab…...
SpringBoot项目注入 traceId 来追踪整个请求的日志链路
SpringBoot项目注入 traceId 来追踪整个请求的日志链路,有了 traceId, 我们在排查问题的时候,可以迅速根据 traceId 查找到相关请求的日志,特别是在生产环境的时候,用户可能只提供一个错误截图,我们作为开发…...
UVM 软链接应用
软链接在环境中主要是为了代码复用,目前用到的场景有两种: 1)将UT 环境的代码 链接到ST环境上复用: 将ut 的transaction和sequence等在ST上复用 使用方法 在st对应目录下执行命令: ln -s 。…/xxxx/UT/xxx/xx_transact…...
SCIKIT-LEARN 决策树实现csv文档简单的推论预测
一.学习背景 原文来自scikit-learn的学习拓展,根据樱花分类示例衍生而来。源文开源地址:scikit-learn: machine learning in Python — scikit-learn 0.16.1 documentation,想学机器学习和数据挖掘的可以去瞧瞧! 二.读取csv文档 …...
VM虚拟机安装与配置Ubuntu Linux操作系统详细教程~
一、下载VM虚拟机 VMware16.0.zip百度网盘下载链接:https://pan.baidu.com/s/1-l-CcAVNINqhRLSiQ26R7w?pwd=tznn 提取码: tznn 二、软件介绍 VMware(虚拟机)是指通过软件模拟的具有完整硬件系统功能的、运行在一个完全隔离环境中的完整计算机系统,通过它可在一台电脑上同…...
使用 Ray的可观察性功能监控和调试 Ray 应用程序和集群
一、引言 在分布式系统中,监控和调试是确保系统稳定运行的关键环节。Ray 作为一款高性能的分布式计算框架,提供了丰富的可观察性(Observability)功能,帮助用户监控和调试 Ray 应用程序和集群。本文将详细介绍如何使用 Ray 的可观察性功能,包括监控工具、调试流程、日志管…...
jmeter 如何做移动端的测试 特别是兼容性测试
JMeter本身主要是一款用于性能测试和功能测试的工具,虽然它并非专门为移动端测试设计,但可以通过一些方式来对移动端应用进行测试,以下从测试准备、测试过程及注意事项等方面为你详细介绍: 一、测试准备 (一)环境搭建 JMeter安装与配置:确保JMeter已经正确安装在测试机…...
模板方法模式
模板方法模式(Template Method Pattern)是一种行为型设计模式,它定义了一个算法的骨架,允许子类在不改变算法结构的情况下重写某些步骤的具体实现。 核心思想 抽象类定义模板方法(final 修饰,防止子类修改…...
64位精度HPC计算引擎的十年博弈:AMD如何以性价比颠覆NVIDIA霸权?
若期望一款中央处理器(CPU)具备图形处理器(GPU)级别的浮点运算性能,根据CPU发展路线图,大约需等待六年左右。这显然是一段漫长的等待期,这也解释了为何十五年前众多高性能计算(HPC&a…...
2P4M-ASEMI照明控制专用2P4M
编辑:ll 2P4M外观与基本结构 2P4M 可控硅通常封装在一个小巧的塑料外壳内,从外观上看,它有着几个标志性的引脚。一般为三脚结构,每个引脚都肩负着不同的使命。其内部结构精密复杂,核心是由多层半导体材料交替堆叠而成…...
【工程管理与安全工程方向 EI会议征稿 | EI Compendex、Scopus收录】2025年工程管理与安全工程国际学术会议 (EMSE 2025)
重要信息: 大会官网:www.ic-emse.com【论文投稿】 大会时间:2025年3月21-23日 大会地点:中国-南京 截稿时间:以官网信息为准 出版信息:<...
VMware work station 与Device/Credential Guard不兼容
1.出现如下错误 按 下Windows徽标键R 然后输入msinfo32.exe,会出现系统信息,在系统信息里找到基于虚拟化的安全性,查看是否打开 gpedit.msc 注册表修改...
STM32开发学习(三)----使用STM32CUBEMX创建项目
前言 开始正式接触代码,学习代码开发,先熟悉STM32CUBEMX软件,控制开发板的GPIO。(STM32F103C8T6)。 正式开始 1.打开软件 2.点击ACCESS TO MCU SELECTOR,进入软件选择,可能会弹出更新,等待更新完成即可。…...
smolagents学习笔记系列(七)Examples-Self-correcting Text-to-SQL
这篇文章锁定官网教程中 Examples 章节中的 Self-correcting Text-to-SQL 文章,主要介绍了如何使用 Agent 对数据库进行查找。 官网链接:https://huggingface.co/docs/smolagents/v1.9.2/en/examples/text_to_sql; 【注意事项】:…...
ffmpeg常用方法(一)
FFmpeg是一个非常强大的开源项目,提供了一套可以用来录制、转换数字音频、视频,并能将其转换成不同格式的工具和库。它是命令行工具,意味着它没有图形用户界面,但它能够被嵌入到其他应用程序中。它支持多种操作系统,包…...
c++:多态
1.多态 多态就是多种形态的意思。 分为编译时多态,也叫静态多态,通过传递不同参数的方式使同一个函数好像实现了不同的状态。eg:函数模板,函数重载 还有运行时多态,也叫动态多态。通过使用不同的对象调用“同一个函数”…...
Nuxt.js 3【详解】服务器 Server
Nuxt.js 是一个全栈框架,可以在一个项目中,同时完成前端和后端的开发。 服务器架构 Nuxt.js 的服务端由 Nitro 实现,Nitro 由基于 H3 实现。 Nitro 官网 https://nitro.build/guideH3 官网 https://h3.unjs.io/guide 接口路由 基于文件目录自…...
DeepSeek回答:AI时代Go语言学习路线
最近有小伙伴经常会问:**该如何学习入门Go语言?怎样提升Go语言Coding水平?**这篇文章我们就使用DeepSeek来梳理下Go语言在AI时代的学习路线。 向DeepSeek提问的问题原文: 你现在是一名资深的Go语言工程师,精通Go语言并…...
OpenGL 04--GLSL、数据类型、Uniform、着色器类
一、着色器 在 OpenGL 中,着色器(Shader)是运行在 GPU 上的程序,用于处理图形渲染管线中的不同阶段。 这些小程序为图形渲染管线的某个特定部分而运行。从基本意义上来说,着色器只是一种把输入转化为输出的程序。着色器…...
仅需三分钟,使用Vue3.x版本组件式风格实现一个消息提示组件!
一、前言 在日常的前端项目开发中,我们时常需要使用到“消息提示”(以下简称“消息”)这个组件来帮助我们更好的给予用户提示,例如常见的“登录成功”、“操作成功”、“服务器异常”等等提示。 尽管市面上已经有一些组件库提供了…...
天猫代运营公司推荐:品融电商
天猫代运营公司推荐:品融电商 在电商行业竞争日益激烈的今天,选择一家专业的天猫代运营公司成为众多品牌商家提升市场竞争力、实现销售增长的关键。在众多代运营公司中,品融电商凭借其专业的团队、丰富的经验和显著的成功案例,脱…...
2025.2.26#Java开发中的鉴权机制详解
1、Java开发中的鉴权机制详解 用户问的是在Java开发中什么是鉴权。首先,我需要明确鉴权的定义。鉴权,也就是认证授权,是确认用户身份和权限的过程。可能用户刚接触安全相关的内容,需要简单明了的解释。 接下来,我应该分…...
AF3 DataPipeline类解读
AlphaFold3 的DataPipeline类处理单链蛋白数据,主要负责从不同数据源(FASTA、PDB、mmCIF、ProteinNet .core 文件等)解析输入序列、MSA、模板匹配、序列嵌入(如 ESM-2)、并将其转换为 AlphaFold3 可用的特征格式。 源代码: class DataPipeline:"""Assem…...
Windows系统PyTorch环境配置
0、前言 深度学习为什么要配置GPU? GPU(图形处理单元)最初是为图形渲染而设计的,它们擅长处理大量并行计算任务。深度学习模型,特别是卷积神经网络(CNN)和循环神经网络(RNN…...
策略模式环境类的实现方式对比
文章目录 1、策略模式2、聚合策略类实现方式一3、聚合策略类实现方式二4、对比5、补充:ApplicationContextAware接口 1、策略模式 近期工作中,需要处理4.x和5.x两个版本的数据,所以自然想到的是策略模式,写一个抽象类,…...
深入理解JVM的运行时数据区
🧑 博主简介:CSDN博客专家,历代文学网(PC端可以访问:https://literature.sinhy.com/#/literature?__c1000,移动端可微信小程序搜索“历代文学”)总架构师,15年工作经验,…...
mapbox基础,加载background背景图层
👨⚕️ 主页: gis分享者 👨⚕️ 感谢各位大佬 点赞👍 收藏⭐ 留言📝 加关注✅! 👨⚕️ 收录于专栏:mapbox 从入门到精通 文章目录 一、🍀前言1.1 ☘️mapboxgl.Map 地图对象…...
14.二叉搜索树
二叉搜索树 1.概念 ⼆叉搜索树⼜称⼆叉排序树,它或者是⼀棵空树,或者是具有以下性质的⼆叉树: *若它的左⼦树不为空,则左⼦树上所有结点的值都⼩于等于根结点的值 *若它的右⼦树不为空,则右⼦树上所有结点的值都⼤于等于根结点…...
javascript-es6 (五)
内置构造函数 在 JavaScript 中 最主要 的数据类型有 6 种: 基本数据类型: 字符串、数值、布尔、undefined、null 引用类型: 对象 但是,我们会发现有些特殊情况: //普通字符串 const str peiqi console.log(str.length) //…...
飞鱼科技游戏策划岗内推
协助策划完成相关工作,包括但不仅限于策划配置,资料搜集,游戏体验; 游戏策划相关作品;游戏大赛经历;游戏demo制作经历;游戏公司策划岗位实习经历优先 内推码 DSZP7YFU...
探索浮点数在内存中的存储(附带快速计算补码转十进制)
目录 一、浮点数在内存中的存储 1、常见的浮点数: 2、浮点数存储规则: 3、内存中无法精确存储: 4、移码与指数位E: 5、指数E的三种情况: 二、快速计算补码转十进制 1、第一种方法讨论: 2、第二种方…...
elfk+zookeeper+kafka数据流
申请7台部署elfkzookeeperkafka 数据流: filebeat(每台app) ------>【logstash(2) kafka(3)】 -------> logstash(1) -------> 【elasticsearch(3) kibana(1)】...
汽车悬架系统技术演进:从被动到全主动的革新之路(主动悬架类型对比)
在汽车工业的百年发展史中,悬架系统始终是平衡车辆性能与舒适性的关键战场。随着消费者对驾乘体验要求的不断提升,传统被动悬架已难以满足中高端车型的需求,而半主动与全主动悬架技术的崛起,正在重塑行业格局。本文将深入解析三大…...
什么限制了LLM:空间复杂度限制
什么限制了LLM: 空间复杂度限制 空间复杂度是对一个算法在运行过程中临时占用存储空间大小的量度,它描述了算法所需的额外存储空间与输入数据规模之间的增长关系。这里的存储空间主要包括算法执行过程中所使用的变量、数据结构、栈空间等。和时间复杂度类似,空间复杂度通常也…...
Docker02 - 深入理解Docker
深入理解Docker 文章目录 深入理解Docker一:Docker镜像原理1:镜像加载原理1.1:unionFS1.2:加载原理 2:分层理解 二:容器数据卷详解1:什么是容器数据卷2:使用数据卷3:具名…...
深度学习中卷积层(Conv)、BN层(Batch Normalization)和 ReLU层(Rectified Linear Unit)的详细介绍
一、卷积层(Conv) 定义 卷积层是深度学习中卷积神经网络(CNN)的核心组成部分。它通过对输入数据(如图像)进行卷积操作来提取特征。卷积操作是用一个卷积核(也称为滤波器)在输入数据上…...