flink sink kafka
接上文:一文说清flink从编码到部署上线
之前写了kafka source,现在补充kafka sink。完善kafka相关操作。
环境说明:MySQL:5.7;flink:1.14.0;hadoop:3.0.0;操作系统:CentOS 7.6;JDK:1.8.0_401;kafka_2.12-2.5.0。
1. kafka 创建 topic
topic:rv-test-sink。
2.添加依赖
<!--flink cdc kafka 相关依赖--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.11</artifactId><version>${flink.version}</version></dependency>
3.创建运行环境
package com.zl.utils;import org.apache.flink.configuration.Configuration;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import java.time.Duration;
import java.time.ZoneOffset;
import java.util.concurrent.TimeUnit;/*** EnvUtil* @description:*/
public class EnvUtil {/*** 设置flink执行环境* @param parallelism 并行度*/public static StreamExecutionEnvironment setFlinkEnv(int parallelism) {// System.setProperty("HADOOP_USER_NAME", "用户名") 对应的是 hdfs文件系统目录下的路径:/user/用户名的文件夹名,本文为rootSystem.setProperty("HADOOP_USER_NAME", "root");Configuration conf = new Configuration();conf.setInteger("rest.port", 1000);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);if (parallelism >0 ){//设置并行度env.setParallelism(parallelism);} else {env.setParallelism(1);// 默认1}// 添加重启机制
// env.setRestartStrategy(RestartStrategies.fixedDelayRestart(50, Time.minutes(6)));// 没有这个配置,会导致“Flink 任务没报错,但是无法同步数据到doris”。// 启动checkpoint,设置模式为精确一次 (这是默认值),10*60*1000=60000env.enableCheckpointing(60000, CheckpointingMode.EXACTLY_ONCE);//rocksdb状态后端,启用增量checkpointenv.setStateBackend(new EmbeddedRocksDBStateBackend(true));//设置checkpoint路径CheckpointConfig checkpointConfig = env.getCheckpointConfig();// 同一时间只允许一个 checkpoint 进行(默认)checkpointConfig.setMaxConcurrentCheckpoints(1);//最小间隔,10*60*1000=60000checkpointConfig.setMinPauseBetweenCheckpoints(60000);// 取消任务后,checkpoint仍然保存checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);//checkpoint容忍失败的次数checkpointConfig.setTolerableCheckpointFailureNumber(5);//checkpoint超时时间 默认10分钟checkpointConfig.setCheckpointTimeout(TimeUnit.MINUTES.toMillis(10));//禁用operator chain(方便排查反压)env.disableOperatorChaining();return env;}public static StreamTableEnvironment getFlinkTenv(StreamExecutionEnvironment env) {StreamTableEnvironment tenv = StreamTableEnvironment.create(env);//设置时区 东八tenv.getConfig().setLocalTimeZone(ZoneOffset.ofHours(8));Configuration configuration = tenv.getConfig().getConfiguration();// 开启miniBatchconfiguration.setString("table.exec.mini-batch.enabled", "true");// 批量输出的间隔时间configuration.setString("table.exec.mini-batch.allow-latency", "5 s");// 防止OOM设置每个批次最多缓存数据的条数,可以设为2万条configuration.setString("table.exec.mini-batch.size", "20000");// 开启LocalGlobalconfiguration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE");//设置TTL API指定tenv.getConfig().setIdleStateRetention(Duration.ofHours(25));return tenv;}}
4.核心代码
package com.zl.kafka;import com.alibaba.fastjson.JSONObject;
import com.zl.utils.EnvUtil;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.kafka.clients.producer.ProducerRecord;import javax.annotation.Nullable;
import java.nio.charset.StandardCharsets;
import java.util.Properties;public class KafkaExampleSink {public static void main(String[] args) throws Exception {// 配置运行环境,并行度1StreamExecutionEnvironment env = EnvUtil.setFlinkEnv(1);// 程序间隔离,每个程序单独设置env.getCheckpointConfig().setCheckpointStorage("hdfs://10.86.97.191:9000/flinktest/KafkaExampleSink");/// ===== 构造kafka sink =====// 相关参数配置可以参考下面这两个文档:①https://cloud.tencent.com/developer/article/2089393// ②https://www.bilibili.com/opus/819228616166473783// kafka配置Properties prop = new Properties();prop.setProperty("bootstrap.servers", "10.86.97.21:9092,10.86.97.21:9093,10.86.97.21:9094");// 当设置为“true”时,生产者将确保流中只写入每条消息的一个副本。prop.setProperty("enable.idempotence", "true");// 指定了生产者在接收到服务器相应之前可以发送多个消息,值越高,占用的内存越大,// 当然也可以提升吞吐量,发生错误时,可能会造成数据的发送顺序改变,其默认值是5.prop.setProperty("max.in.flight.requests.per.connection", "5");prop.setProperty("acks", "all");// 在kafka中消息发送失败时,指定生产者可以重发消息的次数,默认情况下,// 生产者在每次重试之间默认等待100ms,可以通过参数retey.backoff.ms参数来改变这个时间间隔。retries的缺省值:0.prop.setProperty("retries", "5");// 事务超时时间prop.setProperty("transaction.timeout.ms", 15 * 60 * 1000 + "");String topic = "rv-test-sink";FlinkKafkaProducer<String> flinkKafkaProducer = new FlinkKafkaProducer<String>(topic,// topicnew KafkaSerializationSchema<String>() {@Overridepublic ProducerRecord<byte[], byte[]> serialize(String s, @Nullable Long aLong) {return new ProducerRecord<>(topic, s.getBytes(StandardCharsets.UTF_8));}},prop,FlinkKafkaProducer.Semantic.EXACTLY_ONCE);/// ===== 构造模拟数据 =====JSONObject rvJsonObject = new JSONObject();rvJsonObject.put("dt","2024-12-20");// 日期取当天rvJsonObject.put("uuid","data-stream-1");rvJsonObject.put("report_time",1733881971621L);String mockJson = JSONObject.toJSONString(rvJsonObject);/// ===== sink kafka =====env.fromElements(mockJson).addSink(flinkKafkaProducer).setParallelism(3).name("kafka-sink").uid("kafka-sink");env.execute("kafka-sink-job");}// main}
5.运行
由于不是持续输入流,运行完会结束。
sink到kafka的数据如下:
6.完整代码
完整代码见:完整代码
相关文章:
flink sink kafka
接上文:一文说清flink从编码到部署上线 之前写了kafka source,现在补充kafka sink。完善kafka相关操作。 环境说明:MySQL:5.7;flink:1.14.0;hadoop:3.0.0;操作系统&#…...
vue万达地产物业缴费分析系统
摘 要 随着互联网趋势的到来,各行各业都在考虑利用互联网将自己推广出去,最好方式就是建立自己的互联网系统,并对其进行维护和管理。在现实运用中,应用软件的工作规则和开发步骤,采用Java技术建设万达地产物业缴费分析…...
数据库 MYSQL的概念
数据库的概念 数据库是按照数据结 构来组织、存储和管理数据的系统,它允许用户高效地存储、检索、更新和管理数据 database:用来组织,存储,管理数据的仓库 数据库的管理系统:DBMS,实现对数据的有效储值&am…...
docker 容器的基本使用
docker 容器 一、docker是什么? 软件的打包技术,就是将算乱的多个文件打包为一个整体,打包技术在没有docker容器之前,一直是有这种需求的,比如上节课我把我安装的虚拟机给你们打包了,前面的这种打包方式是…...
Nginx IP优化限制策略
Nginx 如何限制每个 IP 地址的连接数,优化资源分配? Nginx 限制每个 IP 地址的连接数 Nginx 提供了多种机制来限制单个 IP 地址所能建立的同时连接数,这对于防止资源耗尽和提高服务稳定性至关重要。以下是几种有效策略: 1. 使用…...
某科技局国产服务器PVE虚拟化技术文档
环境介绍 硬件配置 服务器品牌:黄河 型号:Huanghe 2280 V2 Cpu型号:kunpeng-920 磁盘信息 :480SSD * 2 ,4T*4 网卡:板载四口千兆 如下表 四台服务器同等型号配置,均做单节点虚拟化,数据保护采用底层r…...
新能源汽车锂离子电池各参数的时间序列关系
Hi,大家好,我是半亩花海。为了进一步开展新能源汽车锂离子电池的相关研究,本文主要汇总并介绍了电动汽车的锂离子电池的各项参数,通过 MATLAB 软件对 Oxford Dataset 的相关数据集进行数据处理与分析,进一步研究各项参…...
单片机:实现自动关机电路(附带源码)
单片机实现自动关机电路 在许多嵌入式系统或便携式设备中,自动关机功能非常重要,尤其是在电池供电的设备中,防止设备长时间开启以节省电能。自动关机电路的基本功能是检测设备是否处于待机状态,若一定时间内未收到用户操作信号或…...
/etc/fstab 文件学习systemd与该文件关系
文章目录 一、文件字段1.1、设备标识1.2、挂载点1.3、文件系统类型1.4、挂载选项1.5、dump1.5、fsck顺序 二、/etc/fstab 与systemd 的关系2.1、/etc/fstab 与systemd 的关系2.2、systemd 之前/etc/fstab生效过程2.3、systemd 时代/etc/fstab生效过程 三、相关知识3.1、如何更具…...
springcloud基础
一 SpringCloud简介 1.1 SpringCloud是什么 SpringCloud,基于SpringBoot提供了一套微服务解决方案,包括服务注册与发现,配置中心,全链路监控,服务网关,负载均衡,熔断器等组件,除了基于NetFli…...
全面解析 Kubernetes 流量负载均衡:iptables 与 IPVS 模式
目录 Kubernetes 中 Service 的流量负载均衡模式 1. iptables 模式 工作原理 数据路径 优点 缺点 适用场景 2. IPVS 模式 工作原理 数据路径 优点 缺点 适用场景 两种模式的对比 如何切换模式 启用 IPVS 模式 验证模式 总结 Kubernetes 中 Service 的流量负载…...
HTML+CSS+JS制作汽车网站(内附源码,含5个页面)
一、作品介绍 HTMLCSSJS制作一个汽车网站,包含首页、新车发布页、预约试驾页、最新资讯页、品牌故事页等5个静态页面。其中每个页面都包含一个导航栏、一个主要区域和一个底部区域。 二、页面结构 1. 顶部导航栏 包含logo、主导航菜单(首页、新车、二…...
GraalVM完全指南:云原生时代下使用GraalVM将Spring Boot 3应用转换为高效Windows EXE文件
一、前言 在现代软件开发中,启动速度和资源利用率常常是衡量应用性能的关键指标。对于基于Spring Boot的应用来说,虽然它们易于开发和部署,但JVM的启动时间有时会成为一个瓶颈。本文介绍如何使用GraalVM将Spring Boot 3应用编译成原生Windows可执行文件(EXE),从而显著提…...
微软开源GraphRAG的使用教程-使用自定义数据测试GraphRAG
微软在今年4月份的时候提出了GraphRAG的概念,然后在上周开源了GraphRAG,Github链接见https://github.com/microsoft/graphrag,截止当前,已有6900Star。 安装教程 官方推荐使用Python3.10-3.12版本,我使用Python3.10版本安装时,在…...
C++ 中的字面量类型定义
在 C 中,字面量类型(Literal Type)是指可以作为字面量使用的类型。字面量是指代码中直接写出的常量值,比如整数 42、浮点数 3.14、字符串 "hello" 等。而字面量类型则是支持创建这些字面量的类型。 C 中的字面量类型定…...
LeetCode:101. 对称二叉树
跟着carl学算法,本系列博客仅做个人记录,建议大家都去看carl本人的博客,写的真的很好的! 代码随想录 LeetCode:101. 对称二叉树 给你一个二叉树的根节点 root , 检查它是否轴对称。 示例 1: 输…...
Docker Compose 配置指南
目录 1. Docker Compose 配置1.1 基本配置结构1.2 docker-compose.yml 的各部分1.3 常用配置选项 2. Docker Compose 使用方法2.1 创建 Docker Compose 配置文件2.2 启动服务2.3 查看容器状态2.4 查看服务日志2.5 停止服务2.6 重新构建服务 3. Docker Compose 常用命令3.1 dock…...
【Linux开发工具】自动化构建-make/Makefile
🔥个人主页🔥:孤寂大仙V 🌈收录专栏🌈:Linux 🌹往期回顾🌹:【Linux开发工具】gcc和g 🔖流水不争,争的是滔滔不 一、make和Makefile简介1.1 什么是…...
VSCode 搭建Python编程环境 2024新版图文安装教程(Python环境搭建+VSCode安装+运行测试+背景图设置)
名人说:一点浩然气,千里快哉风。—— 苏轼《水调歌头》 创作者:Code_流苏(CSDN) 目录 一、Python环境安装二、VScode下载及安装三、VSCode配置Python环境四、运行测试五、背景图设置 很高兴你打开了这篇博客,更多详细的安装教程&…...
Python 异步协程:从 async/await 到 asyncio 再到 async with
在 Python 3.8 以后的版本中,异步编程变得越来越重要。本文将系统介绍 Python 标准库中的异步编程工具,带领大家掌握 async/await 语法和 asyncio 的使用。 从一个简单的场景开始 假设我们在处理一些耗时的 I/O 操作,比如读取多个文件或处理…...
矩阵-向量乘法的行与列的解释(Row and Column Interpretations):中英双语
本文是学习这本书的笔记 网站是:https://web.stanford.edu/~boyd/vmls/ 矩阵-向量乘法的行与列的解释 矩阵-向量乘法(Matrix-Vector Multiplication)是线性代数中的基本操作,也是机器学习、数据科学和工程中常用的数学工具。本文…...
针对超大规模病理图像分析!华中科技大学提出医学图像分割模型,提高干燥综合征诊断准确性
口干、眼干、皮肤干,每天伴有不明原因的肌肉酸痛和全身乏力,如果以上症状你「中招」了,除了考虑冬季天气干燥外,还应该警惕一种常见却总是被我们忽视的疾病——干燥综合征 (Sjgren’s Syndrome, SS)。 干燥综合征是以外分泌腺高度…...
混合开发环境---使用编程AI辅助开发Qt
文章目录 [toc]1、说明2、演示视频 1、说明 新时代的浪潮早就已经来临,上不了船的人终将被抛弃,合理使用AI辅助开发、提升效率是大趋势 注意:不要被AI奴隶 合理使用AI辅助编程,十倍提升效率。 大部分的编程AI都有vs code插件&…...
Unity复刻胡闹厨房复盘 模块一 新输入系统订阅链与重绑定
本文仅作学习交流,不做任何商业用途 郑重感谢siki老师的汉化教程与代码猴的免费教程以及搬运烤肉的小伙伴 版本:Unity6 模板:3D 核心 渲染管线:URP ------------------------------…...
[前端]HTTP库Axios
一、Axios简介 Axios 是一个基于 Promise 的 HTTP 客户端,用于浏览器和 node.js 环境。它是一个流行的 JavaScript 库,用于发起 HTTP 请求,如 GET、POST、DELETE 等。Axios 提供了易于使用的 API,支持请求和响应的拦截、转换数据格…...
Excel中index()函数
函数功能概述 INDEX 函数用于返回表格或区域中的值或对值的引用。它可以根据指定的行和列的位置从一个单元格区域中提取数据。这个函数有两种形式:数组形式和引用形式。语法结构(数组形式) INDEX(array, row_num, column_num)array࿰…...
linux-----文件命令
文件和目录的基本概念 文件类型: 普通文件:这是最常见的文件类型,用于存储数据,如文本文件、二进制文件等。文本文件可以用文本编辑器打开并查看内容,二进制文件则包含机器可执行的代码或其他特定格式的数据。目录文件…...
lua dofile 传参数
cat 1.lua arg[1] 111 arg[2] 222 dofile(./2.lua) cat 2.lua print("First argument is: " .. arg[1]) print("Second argument is: " .. arg[2]) 执行 lua 1.lua,结果为: First argument is: 111 Second argument is: 222 l…...
【ETCD】【实操篇(二)】如何从源码编译并在window上搭建etcd集群?
要在 Windows 上编译 etcd 及 etcdctl 工具,并使用 bat 脚本启动 etcd 集群,首先需要准备好开发环境并确保依赖项正确安装。下面是从 etcd 3.5 源码开始编译和启动 etcd 集群的详细步骤: 目录 1. 安装 Go 环境2. 获取 etcd 源码3. 编译 etcd…...
重温设计模式--备忘录模式
文章目录 备忘录模式(Memento Pattern)概述定义: 作用:实现状态的保存与恢复支持撤销 / 恢复操作 备忘录模式UML图备忘录模式的结构原发器(Originator):备忘录(Memento)&…...
如何借助边缘智能网关实现厂区粉尘智能监测告警
在诸如木制品加工、纺织品加工、塑料橡胶制品加工等多种工业生产场景中,粉尘问题的隐患和风险不可小觑。如果缺少对生产环境中粉尘的监测和管理,可能发生易燃易爆、环境污染和工人尘肺等生产事故。 针对工业场景中的粉尘状况监测、管理及预警,…...
解析mysqlbinlog
一、前置设置 ps -ef | grep mysql 查看mysql进程对应的安装目录 需设置mysql binlog日志模式为 ROW 二、执行命令 [rootlocalhost bin]# mysqlbinlog --verbose --base64-outputdecode-rows /usr/local/mysql/data/binlog.000069 > 1.sql 查看文件具体内容...
【gym】理解gym并测试gym小游戏CartPole (一)
一、gym与文件位置的联合理解 import gym import inspect# 加载 CliffWalking 环境 env gym.make(CliffWalking-v0)# 获取环境的类 env_class type(env)# 获取环境类所在的文件路径 file_path inspect.getfile(env_class)print(f"The source code for CliffWalking-v0…...
【jvm】内存泄漏的8种情况
目录 1. 说明2. 静态集合类持有对象引用3. 单例模式4. 内部类持有外部类5. 未关闭的连接6. 变量不合理的作用域7. 改变对象的哈希值8. 缓存Cache泄漏9. 监听器和回调 1. 说明 1.内存泄漏(Memory Leak)指的是程序中动态分配的内存由于某种原因没有被释放…...
android:sharedUserId 应用进程声明介绍
背景 adb install 安装系统软件报错,原因是签名不一致,进程改变。 代码分析 AndroidManifest.xml 定义的 android:sharedUserId 应用归属进程不同,从phone切换到system。 初始配置 <manifest xmlns:android="http://schemas.android.com/apk/res/android"c…...
WPSJS:让 WPS 办公与 JavaScript 完美联动
随着办公自动化需求的日益增长,WPS Office 推出了 WPSJS,这是一款强大的开发者工具,允许开发者通过 JavaScript 脚本与 WPS 办公软件进行互动。无论是在表格中自动填充数据、在文档中修改格式,还是在演示文稿中插入动态内容&#…...
【Linux进程】进程间通信(共享内存、消息队列、信号量)
目录 前言 1. System V IPC 2. 共享内存 系统调用接口 shmget ftok shmat shmdt shmctl 共享内存的读写 共享内存的描述对象 3. 消息队列 msgget msgsnd msgctl 消息队列描述对象 4. 信号量 系统调用接口 semget semctl 信号量描述对象 5. 系统层面IPC资源 6.…...
负载均衡的原理
负载均衡(Load Balancing)是一种计算机技术,用于在多个服务器、网络连接、计算资源之间合理分配工作负载,以提升应用程序的可用性、性能和可扩展性,以下是详细介绍: 工作原理 流量分配:负载均衡…...
Flash Attention
op融合 原始方法: 痛点:多次读取、写入显存。 解决:中间结果不保存,1个kernel顺序算完多个操作。 反向传播时用到这些中间结果要求导,怎么办? 答:类似activation checkpointing,重新…...
Craft CMS 模板注入导致 Rce漏洞复现(CVE-2024-56145)(附脚本)
0x01 产品描述: Craft CMS 是一个灵活且强大的内容管理系统(CMS),专为创意团队和开发人员设计,提供高度可定制、直观且性能优越的网站和内容管理解决方案。它以用户友好的界面、强大的插件生态系统以及支持现代web开发最佳实践的特性而闻名0x02 漏洞描述: 由于模板…...
步进电机位置速度双环控制实现
步进电机位置速度双环控制实现 野火stm32电机教学 提高部分-第11讲 步进电机位置速度双环控制实现(1)_哔哩哔哩_bilibili PID模型 位置环作为外环,速度环作为内环。设定目标位置和实际转轴位置的位置偏差,经过位置PID获得位置期望,然后讲位置期望(位置变化反映了转轴的速…...
Sigrity Optimize PI CapGen仿真教程文件路径
为了方便读者能够快速上手和学会Sigrity Optimize PI和 Deacap Generate 的功能,将Sigrity Optimize PI CapGen仿真教程专栏所有文章对应的实例文件上传至以下路径 https://download.csdn.net/download/weixin_54787054/90171471?spm1001.2014.3001.5503...
open Feign日志输出
openFeign默认是没有日志输出的,只有在open Feign所在的包的级别达到debug才会有输出,而且级别有四级。 四种日志级别: OpenFeign只会在FeignClient所在包的日志级别为DEBUG时,才会输出日志。而且其日志级别有4级: NON…...
进程间关系与守护进程
个人主页:C忠实粉丝 欢迎 点赞👍 收藏✨ 留言✉ 加关注💓本文由 C忠实粉丝 原创 进程间关系与守护进程 收录于专栏[Linux学习] 本专栏旨在分享学习Linux的一点学习笔记,欢迎大家在评论区交流讨论💌 目录 1. 进程组 什…...
C++设计模式:组合模式(公司架构案例)
组合模式是一种非常有用的设计模式,用于解决**“部分-整体”**问题。它允许我们用树形结构来表示对象的层次结构,并且让客户端可以统一地操作单个对象和组合对象。 组合模式的核心思想 什么是组合模式? 组合模式的目的是将对象组织成树形结…...
ubuntu 安装docker
Step1:更新系统软件包 sudo apt update Step2:安装依赖包【用于通过HTTPS来获取仓库】 sudo apt install apt-transport-https ca-certificates curl software-properties-common Step3:添加Docker官方GPG密钥 sudo -i curl -fsSL https://…...
PSDK的编译与ROS包封装
本文档讲述在NIVIDIA开发板上使用大疆提供的Payload SDK获取无人机实时GPS信息的方法,以及基于Payload SDK发布ROS GPS话题信息的方法。 文章目录 0 实现目标1 Payload SDK1.1 PSDK 源码的编译1.2 PSDK 的使用 2 遥测数据的读取2.1 示例代码结构2.2 读取机载GPS信息…...
【工作流】工作顺序
背景 当时的情况是:没有产品经理,后端直接和需求方对接;前端只能短时间投入大部分时间要忙别的;只有3个角色:需求方,后端,前端; 当时直接执行的 直接使用会议了解需求,…...
Unity2021.3.16f1可以正常打开,但是Unity2017.3.0f3却常常打开闪退或者Unity2017编辑器运行起来就闪退掉
遇到问题: 从今年开始,不知道咋回事,电脑上的Unity2017像是变了个人似得,突然特别爱闪退掉,有时候还次次闪退,真是让人无语,一直以来我都怀疑是不是电脑上安装了什么别的软件了,导致…...
Java基础面试题20:Java语言sendRedirect()和forward()方法有什么区别?
Java基础面试题:Java语言sendRedirect()和forward()方法有什么区别? 在 Java Web 开发中,sendRedirect() 和 forward() 是两个非常常用的方法,但它们有一些核心区别。我们来用最简单的方式给你解释清楚。 一、sendRedirect() 和 …...