SpringBoot 整合 Avro 与 Kafka 详解
SpringBoot 整合 Avro 与 Kafka 详解
在大数据处理和实时数据流场景中,Apache Kafka 和 Apache Avro 是两个非常重要的工具。Kafka 作为一个分布式流处理平台,能够高效地处理大量数据,而 Avro 则是一个用于序列化数据的紧凑、快速的二进制数据格式。将这两者结合,并通过 Spring Boot 进行整合,可以构建出高效、可扩展的实时数据处理系统。
一、环境准备
在开始整合之前,需要准备好以下环境:
- Java:确保已经安装了 JDK,推荐使用 JDK 8 或更高版本。
- Maven:用于管理项目的依赖和构建过程。
- Spring Boot:作为项目的框架,推荐使用较新的版本,如 Spring Boot 2.x。
- Kafka:确保 Kafka 已经安装并运行,可以使用 Docker 部署 Kafka 集群。
- Avro:Avro 依赖 JSON 定义的架构来序列化数据。
二、项目结构
一个典型的 Spring Boot 项目结构可能如下:
spring-boot-kafka-avro
├── src
│ ├── main
│ │ ├── java
│ │ │ └── com
│ │ │ └── example
│ │ │ ├── SpringBootKafkaAvroApplication.java
│ │ │ ├── config
│ │ │ │ └── KafkaConfig.java
│ │ │ ├── producer
│ │ │ │ └── KafkaProducer.java
│ │ │ ├── consumer
│ │ │ │ └── KafkaConsumer.java
│ │ │ └── model
│ │ │ └── ElectronicsPackage.java (由 Avro 自动生成)
│ │ ├── resources
│ │ │ ├── application.properties
│ │ │ └── avro
│ │ │ └── electronicsPackage.avsc (Avro 架构文件)
│ └── test
│ └── java
│ └── com
│ └── example
│ └── SpringBootKafkaAvroApplicationTests.java
└── pom.xml
三、添加依赖
在 pom.xml
文件中添加必要的依赖:
<dependencies><!-- Spring Boot Starter --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><!-- Spring Kafka --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.9.13</version> <!-- 根据需要选择合适的版本 --></dependency><!-- Avro --><dependency><groupId>org.apache.avro</groupId><artifactId>avro</artifactId><version>1.11.0</version> <!-- 根据需要选择合适的版本 --></dependency><!-- Avro Maven Plugin --><plugin><groupId>org.apache.avro</groupId><artifactId>avro-maven-plugin</artifactId><version>${avro.version}</version><executions><execution><phase>generate-sources</phase><goals><goal>schema</goal></goals><configuration><sourceDirectory>${project.basedir}/src/main/resources/avro/</sourceDirectory><outputDirectory>${project.build.directory}/generated/avro</outputDirectory></configuration></execution></executions></plugin>
</dependencies>
四、定义 Avro 架构
在 src/main/resources/avro/
目录下创建一个 Avro 架构文件 electronicsPackage.avsc
:
{"namespace": "com.example.model","type": "record","name": "ElectronicsPackage","fields": [{"name": "package_number", "type": ["string", "null"], "default": null},{"name": "frs_site_code", "type": ["string", "null"], "default": null},{"name": "frs_site_code_type", "type": ["string", "null"], "default": null}]
}
这个架构文件定义了 ElectronicsPackage
类,包括三个字段:package_number
、frs_site_code
和 frs_site_code_type
。
五、生成 Avro 类
运行 Maven 构建过程,Avro Maven 插件会根据 electronicsPackage.avsc
文件生成相应的 Java 类 ElectronicsPackage.java
。
六、配置 Kafka
在 application.properties
文件中配置 Kafka 的相关属性:
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=my-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=com.example.config.AvroSerializer
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=com.example.config.AvroDeserializer
注意,这里指定了自定义的 AvroSerializer
和 AvroDeserializer
类。
七、实现 Avro 序列化器和反序列化器
创建 AvroSerializer
和 AvroDeserializer
类,用于 Avro 数据的序列化和反序列化。
// AvroSerializer.java
package com.example.config;import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.avro.specific.SpecificRecord;
import org.apache.kafka.common.serialization.Serializer;import java.io.ByteArrayOutputStream;
import java.io.IOException;public class AvroSerializer<T extends SpecificRecord> implements Serializer<T> {private final DatumWriter<T> writer;public AvroSerializer(Class<T> type) {this.writer = new SpecificDatumWriter<>(type);}@Overridepublic byte[] serialize(String topic, T data) {if (data == null) {return null;}ByteArrayOutputStream out = new ByteArrayOutputStream();Encoder encoder = EncoderFactory.get().binaryEncoder(out, null);try {writer.write(data, encoder);encoder.flush();out.close();} catch (IOException e) {throw new RuntimeException(e);}return out.toByteArray();}@Overridepublic void configure(Map<String, ?> configs, boolean isKey) {// No-op}@Overridepublic void close() {// No-op}
}// AvroDeserializer.java
package com.example.config;import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificRecord;
import org.apache.kafka.common.serialization.Deserializer;import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.Map;public class AvroDeserializer<T extends SpecificRecord> implements Deserializer<T> {private final Class<T> type;private final DatumReader<T> reader;public AvroDeserializer(Class<T> type) {this.type = type;this.reader = new SpecificDatumReader<>(type);}@Overridepublic T deserialize(String topic, byte[] data) {if (data == null) {return null;}ByteArrayInputStream in = new ByteArrayInputStream(data);Decoder decoder = DecoderFactory.get().binaryDecoder(in, null);try {return reader.read(null, decoder);} catch (IOException e) {throw new RuntimeException(e);}}@Overridepublic void configure(Map<String, ?> configs, boolean isKey) {// No-op
相关文章:
SpringBoot 整合 Avro 与 Kafka 详解
SpringBoot 整合 Avro 与 Kafka 详解 在大数据处理和实时数据流场景中,Apache Kafka 和 Apache Avro 是两个非常重要的工具。Kafka 作为一个分布式流处理平台,能够高效地处理大量数据,而 Avro 则是一个用于序列化数据的紧凑、快速的二进制数…...
windows C#-使用 Override 和 New 关键字(上)
在 C# 中,派生类中的方法可具有与基类中的方法相同的名称。 可使用 new 和 override 关键字指定方法的交互方式。 override 修饰符用于扩展基类 virtual 方法,而 new 修饰符用于隐藏可访问的基类方法 。 在控制台应用程序中,声明以下两个类…...
FaRM译文
No compromises: distributed transactions with consistency, availability, and performance Aleksandar Dragojevic, Dushyanth Narayanan, Edmund B. Nightingale, Matthew Renzelmann, Alex Shamis, Anirudh Badam, Miguel Castro Microsoft Research 摘要 具有强一致…...
大数据新视界 -- Hive 元数据管理:核心元数据的深度解析(上)(27 / 30)
💖💖💖亲爱的朋友们,热烈欢迎你们来到 青云交的博客!能与你们在此邂逅,我满心欢喜,深感无比荣幸。在这个瞬息万变的时代,我们每个人都在苦苦追寻一处能让心灵安然栖息的港湾。而 我的…...
大数据项目-Django基于聚类算法实现的房屋售房数据分析及可视化系统
《[含文档PPT源码等]精品Django基于聚类算法实现的房屋售房数据分析及可视化系统》该项目含有源码、文档、PPT、配套开发软件、软件安装教程课程答疑等! 数据库管理工具:phpstudy/Navicat或者phpstudy/sqlyog 后台管理系统涉及技术: 后台使…...
当大的div中有六个小的div,上面三个下面三个,当外层div高变大的时候我希望里面的小的div的高也变大
问: 当大的div中有六个小的div,上面三个下面三个,当外层div高变大的时候我希望里面的小的div的高也变大 回答: 这时候我们就不能写死六个小的div的高度,否则上下的小的div的间距就会变大,因为他们的高度…...
使用 Postman 上传二进制类型的图片到后端接口写法
我们有的时候会有需求,就是通过 postman 传递二进制图片到后端接口,如下图: 那我们的 Java 接口需要怎么写呢? Spring Boot 接收这些数据的方式需要使用 RequestBody 注解来处理原始的二进制数据(byte[])。…...
字符串函数和内存函数
字符串函数 1、strlcpy 【字符串拷贝】 (将原字符串中的字符拷贝到目标字符数组中,包括终止符号\0,并在这里停止;为了避免越界,目标字符串数组应该足够大去接收)👆 (返回值是 dest…...
uC/OSII学习笔记(一)任务的增删改查
使用天玛智控的控制器,基础工程文件已移植ucosii。 正常的任务创建流程为: 1.OSInit(); 2.OSTaskCreate(); 3.OSStart(); 但是天玛对其有做修改,任务创建直接调用OSTaskCreate()函数即可,不用在…...
如何搭建JMeter分布式集群环境来进行性能测试
在性能测试中,当面对海量用户请求的压力测试时,单机模式的JMeter往往力不从心。如何通过分布式集群环境,充分发挥JMeter的性能测试能力?这正是许多测试工程师在面临高并发、海量数据时最关注的问题。那么,如何轻松搭建…...
蓝桥杯准备训练(lesson2 ,c++)
3.1 字符型 char //character的缩写在键盘上可以敲出各种字符,如: a , q , , # 等,这些符号都被称为字符,字符是⽤单引号括 起来的,如: ‘a’ , ‘b’ &…...
【踩坑】Collectors.toMap 抛出 NullPointerException 异常
1. 场景重现 public class Test01 {public static void main(String[] args) {List<Person> list Arrays.asList(new Person("anna", 17, 0), new Person("bob", 18, 1), new Person("jack", 20, null));Map<String, Integer> nam…...
泷羽sec专题课笔记-- Linux作业--开机自启动方法以及破解
本笔记为 泷羽sec 《红队全栈课程》学习笔记,课程请可自行前往B站学习,课程/笔记主要涉及网络安全相关知识、系统以及工具的介绍等,请使用该课程、本笔记以及课程和笔记中提及工具的读者,遵守网络安全相关法律法规,切勿…...
OpenCV
MFC(C)的使用 1、官网下载 https://opencv.org/ 选 Library - Release - 选择你需要的版本 2、安装 3、配置环境变量 将 OpenCV 的bin目录 C:\Program Files\OpenCV481\opencv\build\bin添加到系统的PATH环境变量中。这使得在运行程序时能够找到 Open…...
Wwise 使用MIDI文件、采样音频
第一种:当采样音频只有一个文件的时候 1.拖入MIDI文件到Interactive Music Hierarchy层级 2.拖入采样音频到Actor-Mixer Hierarchy层级 3.勾选MIDI显示出面板,设置Root Note与采样音频音高相同,这里是C#5 4.播放测试,成功&…...
OpenStack-Glance组件
Glance Glance使用磁盘格式和容器格式基础配置镜像转换 Glance 是 OpenStack 的镜像服务,负责存储、发现和管理虚拟机镜像。它允许用户创建和共享镜像,用于启动虚拟机实例。 Glance 的主要功能 (1)虚拟机镜像的管理 支持镜像的上…...
写译热点单词 | 50篇文章整理 | 手敲自用
目录 文化类 政治类 经济类 教育类 科技类 健康类 安全类 体育类 第二版 删去了部分不太常用的 文化类 1. 阴历: lunar calendar 2. 阳历: solar calendar 3. 春节: the Spring Festival 4. 除夕: Chinese New Year’s Eve 5. 清明节: Tomb Sweeping Day 6. 重阳…...
【UE5 C++】判断两点连线是否穿过球体
目录 前言 方法一 原理 代码 测试 结果 方法二 原理 一、检查连线与球体的相交情况 二、检查距离与球体半径的关系 三、检查连线与球体的相交 代码 前言 通过数学原理判断空间中任意两点的连线是否穿过球体,再通过射线检测检验算法的正确性。 方法一 …...
A1228 php+Mysql旅游供需平台的设计与实现 导游接单 旅游订单 旅游分享网站 thinkphp框架 源码 配置 文档 全套资料
旅游供需平台 1.项目描述2. 开发背景与意义3.项目功能4.界面展示5.源码获取 1.项目描述 随着社会经济的快速发展,生活水平的提高,人们对旅游的需求日益增强,因此,为给用户提供一个便利的查看导游信息,进行导游招募的平…...
【linux】服务器Ubuntu20.04安装cuda11.8教程
【linux】服务器Ubuntu20.04安装cuda11.8教程 文章目录 【linux】服务器Ubuntu20.04安装cuda11.8教程到官网找到对应版本下载链接终端操作cudnn安装到官网下载下载后解压进入解压后的目录:将头文件复制到 /usr/local/cuda/include/ 目录:将库文件复制到 …...
SpringMVC其他扩展
一、全局异常处理机制: 1.异常处理两种方式: 开发过程中是不可避免地会出现各种异常情况的,例如网络连接异常、数据格式异常、空指针异常等等。异常的出现可能导致程序的运行出现问题,甚至直接导致程序崩溃。因此,在开发过程中,…...
用“*”构成一个倒三角形:JAVA
输入:5 输出: ******* ***** *** * 代码: import java.util.Scanner; //倒三角 public class FF6 {public static void main(String[] args) {Scanner scannernew Scanner(System.in);while (scanner.hasNextInt()){int nscanner…...
洛谷P2670扫雷游戏(Java)
三.P2670 [NOIP2015 普及组] 扫雷游戏 题目背景 NOIP2015 普及组 T2 题目描述 扫雷游戏是一款十分经典的单机小游戏。在 n 行 m列的雷区中有一些格子含有地雷(称之为地雷格),其他格子不含地雷(称之为非地雷格)。玩…...
Windows 11 环境下 条码阅读器输入到记事本的内容不完整
使用Windows11时,为什么记事本应用程序中的扫描数据被截断或不完整?为什么sdo 特殊字符的显示与Windows 10 记事本应用程序不同? 很多人认为和中文输入法有关,其实主要问题出在这个windows11下的记事本程序上,大家知道这个就可以了&#x…...
C# 动态类型 Dynamic
文章目录 前言1. 什么是 Dynamic?2. 声明 Dynamic 变量3. Dynamic 的运行时类型检查4. 动态类型与反射的对比5. 使用 Dynamic 进行动态方法调用6. Dynamic 与 原生类型的兼容性7. 动态与 LINQ 的结合8. 结合 DLR 特性9. 动态类型的性能考虑10. 何时使用 Dynamic&…...
设计模式10:观察者模式(订阅-发布)
系列总链接:《大话设计模式》学习记录_net 大话设计-CSDN博客 参考:简说设计模式——工厂方法模式 - JAdam - 博客园 参考:简单工厂模式(Simple Factory Pattern) - 回忆酿的甜 - 博客园 一:概述 观察者模式࿰…...
2020 年 12 月青少年软编等考 C 语言四级真题解析
目录 T1. 开餐馆思路分析T2. 邮票收集思路分析T3. 带通配符的字符串匹配思路分析T4. 删除数字思路分析T1. 开餐馆 北大信息学院的同学小明毕业之后打算创业开餐馆。现在共有 n n n 个地点可供选择。小明打算从中选择合适的位置开设一些餐馆。这 n n n 个地点排列在同一条直线…...
高级java每日一道面试题-2024年12月03日-JVM篇-什么是Stop The World? 什么是OopMap? 什么是安全点?
如果有遗漏,评论区告诉我进行补充 面试官: 什么是Stop The World? 什么是OopMap? 什么是安全点? 我回答: 在Java虚拟机(JVM)中,Stop The World、OopMap 和 安全点 是与垃圾回收(GC)和性能优化密切相关的概念。理…...
探索 Apache Commons Collections 4:Java 集合框架的强大扩展
在 Java 开发中,集合框架是处理数据的核心工具。然而,标准 Java 集合框架虽然功能强大,但在某些场景下仍显得不够灵活。Apache Commons Collections 4(以下简称 commons-collections4)作为一个强大的工具库,…...
NIO(New IO)和BIO(Blocking IO)的区别
Java中的NIO(New IO)和BIO(Blocking IO)的区别及NIO的核心组件 Java中的NIO(New IO)和BIO(Blocking IO)是两种不同的网络通信模型,各自具有独特的特性和适用场景。下面将…...
【串口助手开发】visual studio 使用C#开发串口助手,生成在其他电脑上可执行文件,可运行的程序
1、改成Release,生成解决方案 串口助手调试成功后,将Debug改为Release,点击生成解决方案 2、运行exe文件 生成解决方案后,在bin文件夹下, Release文件夹下,生成相关文件 复制一整个Release文件夹…...
Linux 编译 convert_geotiff 时遇到的几个问题
步骤1:安装libgeotiff-dev 在ubuntu上,安装命令为: sudo apt-get install libgeotiff-dev在macos上,安装命令为: brew install libgeotiff在Linux上安装命令为: sudo yum install libgeotiff-devel注意…...
执行存储过程报:This function has none of DETERMINISTIC, NO SQL ???
执行存储过程时报如下错你该怎么整? [Err] 1418 - This function has none of DETERMINISTIC, NO SQL, or READS SQL DATA in its declaration and binary logging is enabled (you *might* want to use the less safe log_bin_trust_function_creators variable)来…...
JAVA |日常开发中Servlet详解
JAVA |日常开发中Servlet详解 前言一、Servlet 概述1.1 定义1.2 历史背景 二、Servlet 的生命周期2.1 加载和实例化2.2 初始化(init 方法)2.3 服务(service 方法)2.4 销毁(destroy 方法) 三、Se…...
Spring Cloud Alibaba 之 “Sentinel”
从网上下载好sentinel-dashboard-1.6.3.jar,然后执行 java -jar sentinel-dashboard-1.6.3.jar,执行成功之后在浏览器输入localhost:8080,Sentinel的登录名和密码都是sentinel,登陆成功之后看到只有一个首页。 接下来开始整合Spring Cloud Alibaba Sen…...
UE4外挂实现分析-PC端-附源码
UE4外挂实现分析-PC端 游戏分析 分析工具: Cheat Engine 7.5 x64dbg IDA Pro 参考文章: UE4逆向笔记之GWORLD GName GameInstance - 小透明‘s Blog 【项目源码下载】https://download.csdn.net/download/Runnymmede/90079718 本次分析的游戏使用UE4.2…...
力扣88题:合并两个有序数组
力扣88题:合并两个有序数组 题目描述 给定两个按非递减顺序排列的整数数组 nums1 和 nums2,以及它们的长度 m 和 n,要求将 nums2 合并到 nums1,使得合并后的数组仍按非递减顺序排列。 输入与输出 示例 1: 输入&am…...
Lua面向对象实现
Lua中的面向对象是通过表(table)来模拟类实现的,通过setmetatable(table,metatable)方法,将一个表设置为当前表的元表,之后在调用当前表没有的方法或者键时,会再查询元表中的方法和键,以此来实现…...
小程序 模版与配置
WXML模版语法 一、数据绑定 1、数据绑定的基本原则 (1)在data中定义数据 (2)在WXML中使用数据 2、在data中定义页面的数据 3、Mustache语法的格式(双大括号) 4、Mustache语法的应用场景 (…...
【Elasticsearch】实现分布式系统日志高效追踪
🧑 博主简介:CSDN博客专家,历代文学网(PC端可以访问:https://literature.sinhy.com/#/literature?__c1000,移动端可微信小程序搜索“历代文学”)总架构师,15年工作经验,…...
探索Go语言中的循环单链表
简介 循环单链表是一种特殊的链表数据结构,它的最后一个节点指向链表的头节点,形成一个闭环。今天我们将探讨如何在Go语言中实现和操作这种数据结构。 为什么选择循环单链表? 连续访问:在循环单链表中,可以无限循环…...
[go-redis]客户端的创建与配置说明
创建redis client 使用go-redis库进行创建redis客户端比较简单,只需要调用redis.NewClient接口创建一个客户端 redis.NewClient(&redis.Options{Addr: "127.0.0.1:6379",Password: "",DB: 0, })NewClient接口只接收一个参数red…...
Windows 和 Linux 系统命令行操作详解:从文件管理到进程监控
1.切换盘符与目录操作 在命令行中,切换盘符和目录是最常见的操作。尽管 DOS 和 Linux 在这些操作上有所不同,但它们都能实现相似的功能。 (1)切换盘符 ①DOS命令:在 DOS 中,切换盘符非常简单,使用 盘符名:ÿ…...
SpringBoot中@Import和@ImportResource和@PropertySource
1. Import Import注解是引入java类: 导入Configuration注解的配置类(4.2版本之前只可以导入配置类,4.2版本之后也可以导入普通类)导入ImportSelector的实现类导入ImportBeanDefinitionRegistrar的实现类 SpringBootApplication…...
etcd-v3.5release-(3)-readIndexRead
笔记1:读操作包括两种,readIndex和serilizable,readIndex指一致性读,一旦a读到了数据x,那么a及a以后的数据都能读到x,readIndex读会先确认本leader是不是有效地leader,如果有效则记录此刻的comm…...
Chrome 中小于 12px 文字的实现方式与应用场景详解
让 Chrome 支持小于 12px 的文字 在 Web 开发中,有时需要将文字显示为小于 12px 的尺寸,尤其是在设计精细的 UI 元素时。虽然大多数浏览器支持小于 12px 的字体大小,但 Chrome 默认情况下会通过调整文本渲染来确保文字可读性,尤其在非常小的文字尺寸下,可能会进行抗锯齿处…...
数据挖掘之数据预处理
引言 数据挖掘是从大量数据中提取有用信息和知识的过程。在这个过程中,数据预处理是不可或缺的关键步骤。数据预处理旨在清理和转换数据,以提高数据质量,从而为后续的数据挖掘任务奠定坚实的基础。由于现实世界中的数据通常…...
slam学习笔记6---样例展示雅可比手推过程
背景:一直在使用模板、自动化求导,对于背后雅可比推导只剩一个基本概念,有必要好好梳理巩固一下。本人水平有限,若推导过程有误,欢迎评论区提出。 假设一个二维slam问题,使用欧式距离观测模型,…...
ThreadLocal 详解
ThreadLocal 详解 ThreadLocal 是 Java 提供的一种线程本地存储机制,用于为每个线程提供独立的变量副本,变量的值仅在线程内可见,从而实现线程隔离。这种特性在需要为每个线程维护独立状态的场景中非常有用,例如用户上下文、事务…...
SQL SERVER 2016 AlwaysOn 无域集群+负载均衡搭建与简测
之前和很多群友聊天发现对2016的无域和负载均衡满心期待,毕竟可以简单搭建而且可以不适用第三方负载均衡器,SQL自己可以负载了。windows2016已经可以下载使用了,那么这回终于可以揭开令人憧憬向往的AlwaysOn2016 负载均衡集群的神秘面纱了。 …...