在 Flink + Kafka 实时数仓中,如何确保端到端的 Exactly-Once
在 Flink + Kafka 构建实时数仓时,确保端到端的 Exactly-Once(精确一次) 需要从 数据消费(Source)、处理(Processing)、写入(Sink) 三个阶段协同设计,结合 Flink 的 检查点机制(Checkpoint) 和 Kafka 的 事务支持。以下是具体实现方法及示例配置:
1. 核心机制
(1) Flink Checkpoint
-
作用:定期将算子的状态(State)和 Kafka 消费偏移量(Offset)持久化到可靠存储(如 HDFS、S3)。
-
配置:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(60000); // 60秒触发一次Checkpoint env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30000); // Checkpoint间最小间隔
(2) Kafka 事务
-
两阶段提交(2PC):Flink 的 Kafka Producer 在 Checkpoint 完成时提交事务,确保数据仅写入一次。
-
关键参数:
-
transactional.id
:唯一事务标识,需确保每个 Producer 实例的 ID 唯一。 -
transaction.timeout.ms
:需大于 Flink Checkpoint 间隔(避免事务超时)。
-
2. 端到端 Exactly-Once 实现步骤
(1) Source 端:Kafka Consumer 偏移量管理
-
Flink 的 Kafka Consumer 会在 Checkpoint 时将 消费偏移量 存入状态后端,恢复时从该偏移量重新消费。
-
配置:
Properties props = new Properties(); props.setProperty("bootstrap.servers", "kafka:9092"); props.setProperty("group.id", "flink-group"); props.setProperty("isolation.level", "read_committed"); // 只读取已提交的事务数据 FlinkKafkaConsumer<String> source = new FlinkKafkaConsumer<>("input-topic", new SimpleStringSchema(), props );
(2) 处理阶段:状态一致性
-
Flink 的算子状态(如
KeyedState
、OperatorState
)通过 Checkpoint 持久化,确保故障恢复后状态一致。
(3) Sink 端:Kafka Producer 事务写入
-
事务性 Producer:在 Checkpoint 完成时提交事务,确保数据仅写入一次。
-
配置:
Properties sinkProps = new Properties(); sinkProps.setProperty("bootstrap.servers", "kafka:9092"); sinkProps.setProperty("transaction.timeout.ms", "600000"); // 大于 Checkpoint 间隔 FlinkKafkaProducer<String> sink = new FlinkKafkaProducer<>("output-topic",new SimpleStringSchema(),sinkProps,FlinkKafkaProducer.Semantic.EXACTLY_ONCE // 启用Exactly-Once模式 ); stream.addSink(sink);
3. 端到端流程详解
-
Checkpoint 触发:
-
JobManager 向所有 TaskManager 发送 Checkpoint 信号。
-
Kafka Consumer 提交当前消费偏移量到状态后端。
-
Flink 算子状态持久化。
-
Kafka Producer 预提交事务(写入数据但未提交)。
-
-
Checkpoint 完成:
-
所有算子确认状态保存成功后,JobManager 标记 Checkpoint 完成。
-
Kafka Producer 提交事务(数据对下游可见)。
-
-
故障恢复:
-
Flink 回滚到最近一次成功的 Checkpoint。
-
Kafka Consumer 从 Checkpoint 中的偏移量重新消费。
-
Kafka Producer 回滚未提交的事务(避免数据重复)。
-
4. 关键注意事项
-
事务超时时间:确保
transaction.timeout.ms > checkpoint间隔 + max checkpoint duration
。 -
唯一 Transactional ID:每个 Kafka Producer 实例需分配唯一 ID(可通过算子ID + 子任务ID生成)。
-
幂等性 Sink:若 Sink 为非 Kafka 系统(如数据库),需支持幂等写入或事务(如 MySQL 的
INSERT ... ON DUPLICATE KEY UPDATE
)。
5. 示例场景:实时交易风控
-
需求:从 Kafka 读取交易流水,实时计算用户交易频次(1分钟内超过10次触发风控),结果写回 Kafka。
-
实现:
DataStream<Transaction> transactions = env.addSource(kafkaSource).map(parseTransaction); // 解析交易数据 DataStream<Alert> alerts = transactions.keyBy(Transaction::getUserId).window(TumblingEventTimeWindows.of(Time.minutes(1))).process(new FraudDetectionProcessFunction()); // 检测高频交易 alerts.addSink(kafkaSink); // 事务性写入告警结果
-
Exactly-Once 保障:
-
消费偏移量由 Checkpoint 管理。
-
窗口计数状态由 Flink 持久化。
-
告警结果通过 Kafka 事务写入。
-
6. 常见问题与调优
-
问题1:事务超时导致数据丢失 解决:增大
transaction.timeout.ms
(默认15分钟)并监控 Checkpoint 耗时。 -
问题2:Checkpoint 失败 解决:优化反压(如增加并行度)、调大
checkpoint timeout
。 -
问题3:Kafka Producer 缓冲区满 解决:增大
buffer.memory
和batch.size
。
总结
通过 Flink Checkpoint + Kafka 事务 的协同机制,可以实现从 Kafka 消费到 Kafka 写入的端到端 Exactly-Once。核心在于:
-
Flink 统一管理消费偏移量和状态快照;
-
Kafka Producer 通过事务提交保证数据原子性写入;
-
合理配置超时参数与资源,避免因超时或反压导致的一致性中断。
相关文章:
在 Flink + Kafka 实时数仓中,如何确保端到端的 Exactly-Once
在 Flink Kafka 构建实时数仓时,确保端到端的 Exactly-Once(精确一次) 需要从 数据消费(Source)、处理(Processing)、写入(Sink) 三个阶段协同设计,结合 Fli…...
Python数据分析
目录 一、数据分析的核心流程 (一)明确数据分析目标 (二)数据收集 (三)数据清洗 1. 处理缺失值 2. 去除重复值 3. 修正错误值和异常值 (四)数据探索与可视化 1. 计算描述性…...
Java单例模式总结
说明:单例模式的核心是确保一个类只有一个实例,并提供全局访问点。饿汉式和懒汉式是两种常见的实现方式 一、饿汉式和懒汉式 1. 饿汉式(Eager Initialization) public class EagerSingleton {// 类加载时直接初始化实例private…...
《P7167 [eJOI 2020] Fountain (Day1)》
题目描述 大家都知道喷泉吧?现在有一个喷泉由 N 个圆盘组成,从上到下以此编号为 1∼N,第 i 个喷泉的直径为 Di,容量为 Ci,当一个圆盘里的水大于了这个圆盘的容量,那么水就会溢出往下流,直到…...
Pycharm(二十)张量的运算与操作
一、张量的数据类型转换 1.演示data.type(trch.DoubleTensor) #1.创建张量对象 [6 6 6;6 6 6] datatorch.full([2,3],6) print(data.dtype)#默认为torch.int64(LongTensor) #2.转化为double类型 datadata.type(torch.DoubleTensor) print(data.dtype) #3.转换成int类型 datad…...
JVM之内存管理(二)
部分内容来源:JavaGuide二哥Java 说⼀下 JDK1.6、1.7、1.8 内存区域的变化? JDK1.6、1.7/1.8 内存区域发⽣了变化,主要体现在⽅法区的实现: JDK1.6 常量池在方法区 JDK1.7 JDK1.6 使⽤永久代实现⽅法区:JDK1.7 时发…...
蓝桥杯嵌入式第十一届省赛真题
(一)题目 首先要知道P37对应的CubeMx上面的引脚是PB15,给PB15设置成ADC采集。使用到的PA6和PA7的端口要进行定时器配置 100Hz80 000 000/800 *1000 200Hz80 000 000/400 *1000 ADC采集只需要选择好adc1、adc2 再选择好它的通道就可以了,不…...
LLMs之ChatGPT:《Connecting GitHub to ChatGPT deep research》翻译与解读
LLMs之ChatGPT:《Connecting GitHub to ChatGPT deep research》翻译与解读 导读:这篇OpenAI帮助文档全面介绍了将GitHub连接到ChatGPT进行深度代码研究的方法、优势和注意事项。通过连接GitHub,用户可以充分利用ChatGPT强大的代码理解和生成…...
多环境开发
# 应用环境(公共环境,写一些公共配置) spring:profiles:active: dev --- # 设置环境 # 生产环境 spring:config:activate:on-profile: pro server:port: 80 --- # 开发环境 spring:config:activate:on-profile: dev server:port: 81 --- # 测试环境 spring:config:activate:on-…...
游戏引擎学习第269天:清理菜单绘制
回顾并为今天的工作设定目标 昨天我们对调试系统中的菜单处理做了一些清理工作,今天我想继续对它们的展示和使用方式进行一些打磨和改进。今天的计划还不完全确定,我还没有完全决定要做什么,但是我希望能够完成这部分工作,所以我…...
《解锁React Native与Flutter:社交应用启动速度优化秘籍》
React Native和Flutter作为当下热门的跨平台开发框架,在优化应用启动性能方面各有千秋。今天,我们就深入剖析它们独特的策略与方法。 React Native应用的初始包大小对启动速度影响显著。在打包阶段,通过精准分析依赖,去除未使用的…...
Web3 初学者的第一个实战项目:留言上链 DApp
目录 📌 项目简介:留言上链 DApp(MessageBoard DApp) 🧠 技术栈 🔶 1. Solidity 智能合约代码(MessageBoard.sol) 🔷 2. 前端代码(index.html script.js…...
Innovus 25.1 版本更新:助力数字后端物理设计新飞跃
在数字后端物理设计领域,每一次工具的更新迭代都可能为项目带来巨大的效率提升与品质优化。今天,就让我们一同聚焦 Innovus 25.1 版本(即 25.10 版本)的更新要点,探寻其中蕴藏的创新能量。 一、核心功能的强势进 AI…...
Git简介和发展
Git 简介 Git是一个开源的分布式版本控制系统,跨平台,支持Windows、Linux、MacOS。主要是用于项目的版本管理,是由林纳斯托瓦兹(Linux Torvalds)在2005年为Linux内核开发而创建。 起因 在2002年至2005年间,Linux内核开发团队使…...
adb 实用命令汇总
版权归作者所有,如有转发,请注明文章出处:https://cyrus-studio.github.io/blog/ 基础adb命令 # 重启adb adb kill-server# 查看已连接的设备 adb devices# 进入命令行 adb shell# 使用 -s 参数来指定设备 adb -s <设备序列号> shell…...
DAX 权威指南1:DAX计算、表函数与计算上下文
参考《DAX 权威指南 第二版》 文章目录 二、DAX简介2.1 理解 DAX 计算2.2 计算列和度量值2.3 变量2.3.1 VAR简介2.3.2 VAR的特性 2.4 DAX 错误处理2.4.1 DAX 错误类型2.4.1.1 转换错误2.4.1.2 算术运算错误2.4.1.3 空值或 缺失值 2.4.2 使用IFERROR函数拦截错误2.4.2.1 安全地进…...
使用fdisk 、gdisk管理分区
用 fdisk 管理分区 fdisk 命令工具默认将磁盘划分为 mbr 格式的分区 命令: fdisk 设备名 fdisk 命令以交互方式进行操作的,在菜单中选择相应功能键即可 [rootlocalhost ~]# fdisk /dev/sda # 对 sda 进行分区 Command (m for help): # 进入 fdis…...
Python----神经网络(《Deep Residual Learning for Image Recognition》论文和ResNet网络结构)
一、论文 1.1、论文基本信息 标题:Deep Residual Learning for Image Recognition 作者:Kaiming He, Xiangyu Zhang, Shaoqing Ren, Jian Sun 单位:Microsoft Research 会议:CVPR 2016 主要贡献:提出了一种深度残…...
PostgreSQL 的 pg_collation_actual_version 函数
PostgreSQL 的 pg_collation_actual_version 函数 pg_collation_actual_version 是 PostgreSQL 中用于检查排序规则实际版本信息的函数,主要与 ICU (International Components for Unicode) 排序规则相关。 函数基本概念 函数定义 pg_collation_actual_version(…...
使用Simulink开发Autosar Nvm存储逻辑
文章目录 前言Autosar Nvm接口设计模型及接口生成代码及arxmlRTE接口mappingRTE代码分析总结 前言 之前介绍过Simulink开发Dem故障触发逻辑,本文接着介绍另外一个常用的功能-Nvm存储的实现。 Autosar Nvm接口 Autosar Nvm中一般在上电初始化的时调用Nvm_ReadAll获…...
嵌入式STM32学习——继电器
继电器模块引脚说明 VCC(): 供电正极。连接此引脚到电源(通常是直流电源),以提供继电器线圈所需的电流。 GND(-): 地。连接此引脚到电源的负极或地。 IN(或…...
更换内存条会影响电脑的IP地址吗?——全面解析
在日常电脑维护和升级过程中,许多用户都会遇到需要更换内存条的情况。与此同时,不少用户也担心硬件更换是否会影响电脑的网络配置,特别是IP地址的设置。本文将详细探讨更换内存条与IP地址之间的关系,帮助读者理解这两者之间的本质…...
AWS SNS:解锁高并发消息通知与系统集成的云端利器
导语 在分布式系统架构中,如何实现高效、可靠的消息通知与跨服务通信?AWS Simple Notification Service(SNS)作为全托管的发布/订阅(Pub/Sub)服务,正在成为企业构建弹性系统的核心组件。本文深度…...
【Java ee初阶】网络编程 TCP
TCP的socket api 两个核心的类 ServerSocket 创建一个这样的对象,就相当于打开了一个socket文件。 这个socket对象是给服务器专门使用的 这个类本身不负责发送接收。 主要负责“建立连接” Socket 创建一个这样的对象,也就相当于打开了一个socket文…...
达索MODSIM实施成本高吗?哪家服务商靠谱?
在数字化转型的浪潮中,越来越多的制造企业开始关注达索系统的MODSIM技术。记得去年参加行业峰会时,一位来自汽车零部件企业的技术总监向我倾诉了他的困扰:"我们都知道MODSIM能提升研发效率,但听说实施成本很高,又…...
ISP接口隔离原则
任何层次的软件设计如果依赖了它并不需要的东西,就会带来意料之外的麻烦。ISP强调使用多个特定的接口,而不是一个总接口,避免依赖不需要的接口。即不需要则不应该知道。 ISP特点 降低耦合度:客户端只依赖它需要的接口࿰…...
AI Agent(8):安全与伦理考量
引言 AI Agent作为具有一定自主性的智能系统,其行为可能产生深远影响。确保这些系统安全、可靠、符合伦理标准,并遵守相关法规,不仅是技术挑战,也是社会责任。 随着AI Agent能力的增强,其潜在风险也在增加,从数据泄露到决策偏见,从自主性滥用到责任归属不清,这些问题…...
Python3虚拟环境与包管理:项目隔离的艺术
Python3虚拟环境与包管理 为什么需要虚拟环境?虚拟环境工具:你的岛屿建设者一、使用venv创建虚拟环境创建虚拟环境激活虚拟环境退出虚拟环境 二、 包管理:岛上的补给系统2.1 pip:Python的包安装工具基本用法依赖管理 2.2 高级包管…...
23、DeepSeekMath论文笔记(GRPO)
DeepSeekMath论文笔记 0、研究背景与目标1、GRPO结构GRPO结构PPO知识点**1. PPO的网络模型结构****2. GAE(广义优势估计)原理****1. 优势函数的定义**2.GAE(广义优势估计) 2、关键技术与方法3、核心实验结果4、结论与未来方向关键…...
Python自动化-python基础(下)
六、带参数的装饰器 七、函数生成器 运行结果: 八、通过反射操作对象方法 1.添加和覆盖对象方法 2.删除对象方法 通过使用内建函数: delattr() # 删除 x.a() print("通过反射删除之后") delattr(x, "a") x.a()3 通过反射判断对象是否有指定…...
用Python绘制动态彩色ASCII爱心:技术深度与创意结合
引言 在技术博客的世界里,代码不仅仅是解决问题的工具,更可以是表达创意的媒介。今天我将分享一个独特的Python爱心代码项目,它结合了数学之美、ASCII艺术和动态效果,展示了Python编程的无限可能。这个项目不仅能运行展示出漂亮的…...
【C++】红黑树
1.红黑树的概念 是一种二叉搜索树,在每个节点上增加一个存储位表示节点的颜色,Red或black,通过对任何一条从根到叶子的路径上各个结点着色方式的限制,确保没有一条路径会比其他路径长出俩倍,是接近平衡的。 2.红黑树…...
链表头插法的优化补充、尾插法完结!
头插法的优化补充 这边我们将考虑到可以将动态创建链表,和插入新链表到链表头前方,成为新链表头的方法分开,使其自由度上升,在创建完链表后,还可以添加链表元素到成为新的链表头。 就是说可以单独的调用这个insertHea…...
Java多线程(超详细版!!)
Java多线程(超详细版!!) 文章目录 Java多线程(超详细版!!)1. 线程 进程 多线程2.线程实现2.1线程创建2.1.1 继承Thread类2.1.2 实现runnable接口2.1.2.1 思考:为什么推荐使用runnable接口?2.1.2.1.1 更高的…...
超详细fish-speech本地部署教程
本人配置: windows x64系统 cuda12.6 rtx4070 一、下载fish-speech模型 注意:提前配置好git,教程可在自行搜索 git clone https://gitclone.com/github.com/fishaudio/fish-speech.git cd fish-speech 或者直接进GitHub中下载也可以 …...
Flink和Spark的选型
在Flink和Spark的选型中,需要综合考虑多个技术维度和业务需求,以下是在项目中会重点评估的因素及实际案例说明: 一、核心选型因素 处理模式与延迟要求 Flink:基于事件驱动的流处理优先架构,支持毫秒级低延迟、高吞吐的…...
解锁 DevOps 新境界 :使用 Flux 进行 GitOps 现场演示 – 自动化您的 Kubernetes 部署
前言 GitOps 是实现持续部署的云原生方式。它的名字来源于标准且占主导地位的版本控制系统 Git。GitOps 的 Git 在某种程度上类似于 Kubernetes 的 etcd,但更进一步,因为 etcd 本身不保存版本历史记录。毋庸置疑,任何源代码管理服务…...
【从零实现JsonRpc框架#1】Json库介绍
1.JsonCpp第三方库 JSONCPP 是一个开源的 C 库,用于解析和生成 JSON(JavaScript Object Notation)数据。它提供了简单易用的接口,支持 JSON 的序列化和反序列化操作,适用于处理配置文件、网络通信数据等场景。 2.Jso…...
使用FastAPI和React以及MongoDB构建全栈Web应用02 前言
Who this book is for 本书适合哪些人阅读 This book is designed for web developers who aspire to build robust, scalable, and efficient web applications. It caters to a broad spectrum of developers, from those with foundational knowledge to experienced prof…...
JavaScript中的数据类型
目录 前言 基本类型 Number 特殊的数值NaN Infinity和-Infinity String Boolean Undefined null Symbol Undefined和Null的区别 引用类型 Object(对象) Array(数组) Function(函数) 函数声…...
AI 助力,轻松进行双语学术论文翻译!
在科技日新月异的今天,学术交流中的语言障碍仍然是科研工作者面临的一大挑战。尤其是对于需要查阅大量外文文献的学生、科研人员和学者来说,如何高效地理解和翻译复杂的学术论文成为了一大难题。然而,由Byaidu团队推出的开源项目PDFMathTrans…...
第3.2.3节 Android动态调用链路的获取
3.2.3 Android App动态调用链路 在Android应用中,动态调用链路指的是应用在运行时的调用路径。这通常涉及到方法调用的顺序和调用关系,特别是在应用的复杂逻辑中,理解这些调用链路对于调试和性能优化非常重要。 1,动态调用链路获…...
【Android】文件分块上传尝试
【Android】文件分块上传 在完成一个项目时,遇到了需要上传长视频的场景,尽管可以手动限制视频清晰度和视频的码率帧率,但仍然避免不了视频大小过大的问题,且由于服务器原因,网络不太稳定。这个时候想到了可以将文件分…...
大模型中的三角位置编码实现
Transformer中嵌入表示 位置编码的实现 import torch import math from torch import nn# 词嵌入位置编码实现 class EmbeddingWithPosition(nn.Module):"""vocab_size:词表大小emb_size: 词向量维度seq_max_len: 句子最大长度 (人为设定,例如GPT2…...
深入详解人工智能数学基础——微积分中的自动微分及其在PyTorch中的实现原理
🧑 博主简介:CSDN博客专家、CSDN平台优质创作者,高级开发工程师,数学专业,10年以上C/C++, C#, Java等多种编程语言开发经验,拥有高级工程师证书;擅长C/C++、C#等开发语言,熟悉Java常用开发技术,能熟练应用常用数据库SQL server,Oracle,mysql,postgresql等进行开发应用…...
【Linux学习笔记】系统文件IO之重定向原理分析
【Linux学习笔记】系统文件IO之重定向原理分析 🔥个人主页:大白的编程日记 🔥专栏:Linux学习笔记 文章目录 【Linux学习笔记】系统文件IO之重定向原理分析前言一. 系统文件I/01.1 一种传递标志位的方法1.2 hello.c写文件:1.3 he…...
《React Native与Flutter:社交应用中用户行为分析与埋点统计的深度剖析》
React Native与Flutter作为两款备受瞩目的跨平台开发框架,正深刻地影响着应用的构建方式。当聚焦于用户行为分析与埋点统计时,它们各自展现出独特的策略与工具选择,这些差异和共性不仅关乎开发效率,更与社交应用能否精准把握用户需…...
Cesium高度参考系统
🌍 Cesium高度参考系统趣味探索 🚀 高度参考系统形象比喻 想象一下,你正在玩一个积木游戏: CLAMP_TO_GROUND:积木被"强力胶水"粘在桌面上,无论桌面高低起伏如何 RELATIVE_TO_GROUND:积木放在"微型支架"上,始终保持离桌面固定距离 NONE:积木漂…...
海纳思(Hi3798MV300)机顶盒遇到海思摄像头
海纳思机顶盒遇到海思摄像头,正好家里有个海思Hi3516的摄像头模组开发板,结合机顶盒来做个录像。 准备工作 海纳斯机顶盒摄像机模组两根网线、两个电源、路由器一块64G固态硬盘 摄像机模组和机顶盒都接入路由器的LAN口,确保网络正常通信。 …...
[python] 类
一 介绍 具有相同属性和行为的事物的通称,是一个抽象的概念 三要素: 类名,属性,方法 格式: class 类名: 代码块 class Pepole:name "stitchcool"def getname(self):return self.name 1.1 创建对象(实例化) 格式: 对象名 类名() p1 Pepole()…...