如何使用@KafkaListener实现从nacos中动态获取监听的topic
1、简介
对于经常需要变更kafka主题的场景,为了实现动态监听topic的功能,可以使用以下方式。
2、使用步骤
2.1、添加依赖
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.8.1</version>
</dependency>
2.2、nacos中配置
# kafka 配置
spring:kafka:bootstrap-servers: ip地址:9092topics: topic1,tpic2producer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerenable-idempotence: trueacks: alltransactional-id: kafka-groupconsumer:key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializergroup-id: kafka-clickhouse-groupauto-offset-reset: latestenable-auto-commit: falseisolation-level: read_committedallow-auto-create-topics: truelistener:ack-mode: MANUAL_IMMEDIATEconcurrency: 3
2.3、配置类
package org.aecsi.kafkadatatock.config;import lombok.RequiredArgsConstructor;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.core.*;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.transaction.KafkaTransactionManager;import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;@Configuration
@RequiredArgsConstructor
@EnableKafka
@RefreshScope
public class KafkaConfig {private final KafkaProperties kafkaProperties;@Beanpublic KafkaAdmin kafkaAdmin() {return new KafkaAdmin(kafkaProperties.buildAdminProperties());}@Beanpublic AdminClient adminClient(KafkaAdmin kafkaAdmin) {return AdminClient.create(kafkaAdmin.getConfigurationProperties());}@Beanpublic ProducerFactory<String, String> producerFactory() {Map<String, Object> configProps = new HashMap<>(kafkaProperties.buildProducerProperties());configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);configProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);configProps.put(ProducerConfig.ACKS_CONFIG, "all");configProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "kafka-clickhouse-producer");DefaultKafkaProducerFactory<String, String> factory = new DefaultKafkaProducerFactory<>(configProps);factory.setTransactionIdPrefix("kafka-clickhouse-producer-");return factory;}@Beanpublic KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> producerFactory) {return new KafkaTemplate<>(producerFactory);}@Bean@RefreshScopepublic ConsumerFactory<String, String> consumerFactory() {Map<String, Object> configProps = new HashMap<>(kafkaProperties.buildConsumerProperties());configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);configProps.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, true);configProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");return new DefaultKafkaConsumerFactory<>(configProps);}@Beanpublic KafkaTransactionManager<String, String> transactionManager(ProducerFactory<String, String> producerFactory) {return new KafkaTransactionManager<>(producerFactory);}@Beanpublic ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(ConsumerFactory<String, String> consumerFactory,KafkaTransactionManager<String, String> transactionManager) {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory);factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);factory.getContainerProperties().setTransactionManager(transactionManager);return factory;}@Bean@RefreshScopepublic ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());factory.setAutoStartup(true);return factory;}@Beanpublic ApplicationRunner kafkaListenerStarter(KafkaListenerEndpointRegistry registry) {return args -> {// 启动所有 Kafka 监听器registry.start();};}
}
接收消息类
@KafkaListener(topics = "#{'${spring.kafka.topics}'.split(',')}", autoStartup = "false")@Transactional(transactionManager = "transactionManager")public void processMessage(ConsumerRecord<String, String> record,Acknowledgment acknowledgment,@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long timestamp) {try {log.info("kafka 接受 topic: {} 消息", topic);
// 处理消息acknowledgment.acknowledge();} catch (Exception e) {log.error("Error processing message for topic {}: {}", topic, e.getMessage());throw e;}}
主启动类添加一个注解
@EnableConfigurationProperties(KafkaProperties.class)
3、总结
实现kafka动态获取topic还有其他方式,博主目前只验证这一种,其他方式待更新。
相关文章:
如何使用@KafkaListener实现从nacos中动态获取监听的topic
1、简介 对于经常需要变更kafka主题的场景,为了实现动态监听topic的功能,可以使用以下方式。 2、使用步骤 2.1、添加依赖 <dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactI…...
区块链如何达成共识:PoW/PoS/DPoS的原理、争议与适用场景全解
引言 区块链技术的核心在于去中心化网络中的信任机制,而共识算法是实现这一目标的关键。不同的共识机制在效率、安全性、去中心化程度等方面各有取舍。本文将深入解析三种主流共识机制——工作量证明(PoW)、权益证明&am…...
Oracle for Linux安装和配置(11)——Oracle安装和配置
11.3. Oracle安装和配置 Linux上Oracle的安装及配置与Windows上差不多,只是安装软件的准备等有所不同,下面只对不同于Windows的部分进行较为详细的讲解,其他类似部分不再赘述。另外,无论选择使用虚机还是物理机,Oracle安装、配置和使用等方面几乎都是完全一样的。 11.3.…...
http协议详解附带c/c++服务器和客户端源码
http详解 代码部分HTTP请求结构示例HTTP客户端实现(使用Linux系统调用)简易HTTP服务器实现 深入解析HTTP协议:从基础到实践1. HTTP协议核心概念1.1 协议本质解读1.2 通信模型详解 2. 抓包分析实战2.1 Fiddler工具妙用2.2 报文结构拆解 3. 请求…...
JavaScript性能优化实战(5):数据结构与算法性能优化
JavaScript中常用数据结构性能对比 数据结构的选择对JavaScript应用的性能有着决定性的影响。不同的数据结构在不同操作上各有优劣,选择合适的数据结构能显著提升应用性能。本节将对JavaScript中常用的数据结构进行全面的性能对比分析。 基本数据结构时间复杂度概览 首先,…...
uniapp小程序开发入门01-快速搭建一个空白的项目并预览它
uniapp小程序开发入门01-快速搭建一个空白的项目并预览它!由于近期有市场需求和计划,构建一套自己的小程序,所以再次带领大家系统的过一遍,如何使用uniapp程序快速构建一套完整的项目。今天是第一小节,带领大家快速构建…...
UR5 UR5e机器人URDF文件
URDF全称为Unified Robot Description Format,中文可以翻译为“统一机器人描述格式”。与计算机文件中的.txt文本格式、.jpg图像格式等类似,URDF是一种基于XML规范、用于描述机器人结构的格式。根据该格式的设计者所言,设计这一格式的目的在于提供一种尽可能通用(as genera…...
ubuntu20.04安装x11vnc远程桌面
x11vnc是一个VNC服务器, 安装后我们可以不依赖外部的显示设备, 通过网络远程登录ubuntu桌面。 安装x11vnc sudo apt-get install x11vnc 设置VNC登录密码 sudo x11vnc -storepasswd /etc/x11vnc.pwd 设置x11vnc在开机时自动启动 新建如下文件: sudo vi /lib/systemd/sys…...
AKM旭化成微电子全新推出能量收集IC“AP4413系列”
旭化成微电子开始批量生产用于环保发电的电荷控制集成电路!优化充电电池的充放电,广泛应用于智能遥控器和蓝牙TMTag等设备。 01 概述 旭化成微电子株式会社(AKM)开发出面向小型二次电池(充电电池)的环境…...
机器人行业研究系列报告
新质生产力系列报告:2024年人形机器人核心场景发展洞察研究报告 具身机器人行业现状及未来趋势分析 2025 2025年人形机器人投资策略,量产元年,全球共振,百家争鸣 人形机器人行业深度报告(一):…...
利用JMeter代理服务器方式实现高效压测
前言 在当今快节奏的互联网时代,确保Web应用和服务能够在高负载下稳定运行变得至关重要。无论是电子商务平台、社交媒体网络还是在线教育服务,用户对网站响应速度和稳定性的期望从未如此之高。因此,性能测试不再是一个可选项,而是…...
NLP高频面试题(五十五)——DeepSeek系列概览与发展背景
大型模型浪潮背景 近年来,大型语言模型(Large Language Model, LLM)领域发展迅猛,从GPT-3等超大规模模型的崛起到ChatGPT的横空出世,再到GPT-4的问世,模型参数规模和训练数据量呈指数级增长。以GPT-3为例,参数高达1750亿,在570GB文本数据上训练,显示出模型规模、数据…...
2015-2023 各省 GDP 数据,用QuickBI 进行数据可视化——堆叠图!
嘿,数据爱好者们!今天咱要来一场刺激的数据冒险,深入剖析全国各省的 GDP 数据,而且会借助强大的 QuickBI 工具,用超酷炫的堆叠图让这些数据 “活” 起来,带你一眼看清经济格局! 地址࿱…...
MySQL优化(持续更新)笔记
一、insert优化 : 之前:项目通常是一条insert一条的执行,每一次都需要与MySQL进行建立连接进行网络传输,效率很低 现在: 1.- 批量插入(一条sql就行,一次500-1000) 可以与MyBatis…...
MySQL表的操作 -- 表的增删改查
目录 1. 表的创建2. 表的查看3. 表的修改4. 表的删除5. 总结 1. 表的创建 1.查看字符集及效验规则 2. 表的创建 CREATE TABLE table_name ( field1 datatype, field2 datatype, field3 datatype ) character set 字符集 collate 校验规则 engine 存储引擎;创建用户表1 创建用…...
Java 数组:深度解析
前言 数组作为Java中最基础也是最强大的数据结构之一,其高效性和灵活性在性能关键型应用中无可替代。本文将从进阶使用开始,逐步深入探索Java数组的高级特性和大师级技巧,帮助开发者全面掌握数组技术的精髓。 一、数组基础回顾与性能特性 1.1 数组基本特性对比 特性Java数…...
【基于Qt的QQMusic项目演示第一章】从界面交互到核心功能实现
🌹 作者: 云小逸 🤟 个人主页: 云小逸的主页 🤟 motto: 要敢于一个人默默的面对自己,强大自己才是核心。不要等到什么都没有了,才下定决心去做。种一颗树,最好的时间是十年前,其次就是现在&…...
[Mybatis-plus]
简介 MyBatis-Plus (简称 MP)是一个 MyBatis的增强工具,在 MyBatis 的基础上只做增强不做改变。Mybatis-plus官网地址 注意,在引入了mybatis-plus之后,不要再额外引入mybatis和mybatis-spring,避免因为版本…...
【EDA】EDA中聚类(Clustering)和划分(Partitioning)的应用场景
在VLSI物理设计自动化中,聚类(Clustering)和划分(Partitioning)是两个互补但目标和应用场景截然不同的关键步骤,其核心区别如下: 一、应用阶段与核心目标 1. 聚类(Clustering&…...
PySide与PyQt对比:为何PySide是更优选择
PySide与PyQt对比:为何PySide是更优选择 引言 在Python桌面应用开发领域,Qt框架的绑定库一直是首选方案。两大主要选择—PySide和PyQt,虽然功能相似,但在许可证、性能和支持方面存在显著差异。本文将深入探讨为何PySide通常是更…...
LVGL移植高通矢量字库GT5SLAD3BFA
字库芯片: GT5SLAD3BFA MCU: STM32F429 LVGL版本:V8.4 一,实现gt_read_data() gt_read_data()函数的作用:与字库flash进行通信,函数的定义里调用spi发送数据和接收数据的接口。用户只需要实现该函数,就可以…...
7.0 sharpScada的sql数据的安装
本文介绍开源库SharpScada的配置过程。 1,还原数据库 2.打开SQL server2014配置启动器,并启用Named Pipes,以及TCP/IP 3.启动SQL Server服务中的SQL Server Browser 4.允许远程连接...
杂项知识点
杂项 1 激活函数1.1 sigmoid1.2 tanh1.3 Relu1.4 leakRelu 1 激活函数 常用的激活函数包括sigmoid tanh Relu leakRelu 1.1 sigmoid import torch import numpy as np import matplotlib.pyplot as plt # sigmoid tanh Relu leakRelu ## 1 sigmoid ### 1.1 代码复现sig…...
Android项目升级插件到kotlin 2.1.0后混淆网络请求异常
背景 项目kt插件1.9.24升级到2.1.0后打包编译release网络请求失败了。 retrofit版本2.9.0 错误详情 java.lang.ClassCastException: java.lang.Class cannot be cast to java.lang.reflect.ParameterizedTypeat retrofit2.m.a(Unknown Source:2477)at retrofit2.K.invoke(U…...
uniapp 仿企微左边公司切换页
示例代码: <template><view class"container"><!-- 遮罩层 --><view class"mask" v-if"showSidebar" click"closeSidebar"></view><!-- 侧边栏 --><view class"sidebar"…...
Milvus(7):Schema、主字段和自动识别
1 Schema Schema 定义了 Collections 的数据结构。在创建一个 Collection 之前,你需要设计出它的 Schema。本页将帮助你理解 Collections 模式,并自行设计一个示例模式。 在 Zilliz Cloud 上,Collection Schema 是关系数据库中一个表的组合&a…...
Liunx服务上MySQL服务导致CPU炸了,使用kill -9 mysqld进程id后,无法启动MySQL
1.top命令后,可以看到mysqld沾满了cpu 2.然后我使用了kill -9 16594,杀死了mysqld进程 3.之后,查看mysql服务状态,发现对应的 www/serve/mysqld 目录不存在 sudo systemctl status mysqld4.使用命令查看操作 www/serve 目录的历…...
Java使用IText7动态生成带审批文本框的PDF文档
Java使用IText7动态生成带审批文本框的PDF文档 文章目录 Java使用IText7动态生成带审批文本框的PDF文档1.构建第一个框的起始坐标2.渲染第一个框3.渲染其他的审批框 测试结果示例 实现思路 使用Canvas进行相对定位和绝对定位来确定文本框内文字位置,用Rectangle通…...
【音视频】AVIO输入模式
内存IO模式 AVIOContext *avio_alloc_context( unsigned char *buffer, int buffer_size, int write_flag, void *opaque, int (*read_packet)(void *opaque, uint8_t *buf, int buf_size), int (*write_packet)(void *opaque, uint8_t *buf, int buf_size), int64_t (*seek)(…...
Android中的多线程
线程池 在编程中经常会使用线程来异步处理任务,但是每个线程的创建和销毁都需要一定的开销。如果每次执行一个任务都需要开一个新线程去执行,则这些线程的创建和销毁将消耗大量的资源。并且线程都是“各自为政”,很难对其进行控制,…...
http://noi.openjudge.cn/——2.5基本算法之搜索——200:Solitaire
文章目录 题目宽搜代码总结 题目 总时间限制: 5000ms 单个测试点时间限制: 1000ms 内存限制: 65536kB 描述 Solitaire is a game played on a chessboard 8x8. The rows and columns of the chessboard are numbered from 1 to 8, from the top to the bottom and from left t…...
deep鼠标跟随插件
效果图 实现 首先打开深度系统终端,键入以下安装命令: sudo apt install oneko安装完成后,执行以下命令启动: oneko启动后,就会出现小猫咪,如果终端不关(服务不关),会…...
Verilog 语法 (二)
在掌握了 Verilog 的基础语法和常用程序框架之后,本节将带大家深入学习一些 高级设计知识点。这些内容包括: 阻塞赋值()与非阻塞赋值(<)的区别及使用场景; assign 和 always 语句的差异&am…...
大数据开发环境的安装,配置(Hadoop)
1. 三台linux服务器的安装 1. 安装VMware VMware虚拟机软件是一个“虚拟PC”软件,它使你可以在一台机器上同时运行二个或更多Windows、DOS、LINUX系统。与“多启动”系统相比,VMWare采用了完全不同的概念。 我们可以通过VMware来安装我们的linux虚拟机…...
唯创安全:从传统到智能,工厂智能叉车AI防撞系统解决方案
一、叉车安全管理现状痛点分析 1、司机管理难题 • 违规操作频发:无证驾驶、疲劳驾驶(如作业中吸烟/玩手机)及不系安全带现象普遍,事故风险与法律风险双高。 • 资源分配失衡:未经授权使用导致车辆调度混乱,影响作业效率。 2、…...
Windows与CasaOS跨平台文件同步:SyncThing本地部署与同步配置流程
文章目录 前言1. 添加镜像源2. 应用安装测试3. 安装syncthing3.1 更新应用中心3.2 SyncThing安装与配置3.3 Syncthing使用演示 4. 安装内网穿透工具5. 配置公网地址6. 配置固定公网地址 推荐 前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽…...
基于Django的个性化股票交易管理系统
本项目基于Python3.6、Django2.1、MySql8.0(最好不要使用5.6,字符集等方面均不兼容,否则导入数据库会出错)与股票信息工具包TuShare实现。 创建或激活对应Python开发环境 这里使用了conda来管理环境,强烈推荐…...
Python图像变清晰与锐化,调整对比度,高斯滤波除躁,卷积锐化,中值滤波钝化,神经网络变清晰
本次使用图片来源于百度 import cv2 import time import numpy as np import pywtfrom PIL import Image, ImageEnhance#-i https://pypi.mirrors.ustc.edu.cn/simpledef super_resolution(input_path, output_path, model_path, scale4):# 初始化超分辨率模型sr cv2.dnn_su…...
带根线就无敌?光纤无人机如何成为电子战的终结者
在硝烟弥漫的俄乌战场上,一条超细光缆正在悄然改变战争规则。2024年俄军首次在前线部署光纤FPV无人机,其通过释放光纤线缆传输数据,成功对乌军装甲目标实施精准打击。乌方同时也迅速跟进,于 2025 年初量产光纤FPV 无人机以突破俄军…...
windows 和ubuntu静态路由配置
目录 windows 1 查看当前路由表 2 添加静态路由 3 删除路由 ubuntu route命令(传统方式) 使用ip指令(推荐) ubuntu永久路由配置 子网掩码解释 windows 1 查看当前路由表 -4 只关注ipv4,-6 用于指定显示 IPv6 …...
《深入理解计算机系统》阅读笔记之第四章 处理器体系结构
概述 备注:怎么感觉讲的还是《编码》这本书里面提到的知识点?...
vue项目前后端分离设计
在Vue前端架构中,通过分层结构和模块化设计实现高效的前后端分离,需要系统性规划各层职责、接口管理和数据流控制。以下是结合业界最佳实践的完整方案: 一、分层架构设计 1. 分层结构(自上而下) 层级职责示例技术实现…...
Steam游戏服务器攻防全景解读——如何构建游戏级抗DDoS防御体系?
Steam游戏服务器的DDoS攻防体系设计,从协议层漏洞利用到业务连续性保障,深度拆解反射型攻击、TCP状态耗尽等7类威胁场景。基于全球15个游戏厂商攻防实战数据,提供包含边缘节点调度、AI流量指纹识别、SteamCMD加固配置的三维防护方案ÿ…...
七、web自动化测试03
目录 一、xpath定位1. 属性定位2.属性与逻辑结合3. 属性与层级结合 二、cookie1. 验证码处理方案2. cookie3. 案例:cookie跳过登录 三、pytest1. 介绍及安装2. 定义用例3. 执行测试用例3.1 命令行运行3.2 配置文件运行3.3 项目配置文件config.py 4. 参数化5. 断言6.…...
企业级AI开发利器:Spring AI框架深度解析与实战
企业级AI开发利器:Spring AI框架深度解析与实战 一、前言:Java生态的AI新纪元 在人工智能技术爆发式发展的今天,Java开发者面临着一个新的挑战:如何将大语言模型(LLMs)和生成式AI(GenAI&#…...
docker-compose安装RustDesk远程工具
以下是使用 docker-compose 部署 RustDesk 服务端(ID服务器 hbbs + 中继服务器 hbbr)的完整流程: 1. 创建 docker-compose.yml mkdir -p ~/rustdesk && cd ~/rustdesk vi docker-compose.ymlversion: 3.8services...
Qt基础009(HTTP编程和QJSON)
文章目录 软件开发网络架构BS架构/CS架构 HTTP基本概念QT的HTTP编程JSON数据概述QT生成JSON数据QT解析JSON数据 软件开发网络架构 BS架构/CS架构 在计算机网络和软件开发中,CS架构(Client-Server Architecture,客户端-服务器架构&#x…...
学习整理在centos7上安装mysql8.0版本教程
学习整理在centos7上安装mysql8.0版本教程 查看linux系统版本下载mysql数据库安装环境检查解压mysql安装包创建MySQL需要的目录及授权新增用户组新增组用户配置mysql环境变量编写MySQL配置文件初始化数据库初始化msyql服务启动mysql修改初始化密码配置Linux 系统服务工具,使My…...
第R4周:LSTM-火灾温度预测
文章目录 一、前期准备工作1.导入数据2. 数据集可视化 二、构建数据集1. 数据集预处理2. 设置X, y3. 划分数据集 三、模型训练1. 构建模型2. 定义训练函数3. 定义测试函数4. 正式训练模型 四、模型评估1. Loss图片2. 调用模型进行预测3. R2值评估 总结: ἶ…...
Linux文件管理完全指南:从命名规则到压缩解压
一、文件命名规则:避免踩坑的关键 1. 允许的字符与命名建议 允许字符:除 / 外所有字符均可使用,但需避免 <, >, ?, * 等特殊符号。 命名建议: 统一使用小写字母(Linux严格区分大小写)。 用下划线…...