当前位置: 首页 > news >正文

pyfink1.20版本下实现消费kafka中数据并实时计算

1、环境

JDK版本:1.8.0_412python版本:3.10.6apache-flink版本:1.20.0flink版本:1.20kafka版本:kafka_2.12-3.1.1flink-sql-connector-kafka版本:3.3.0-1.20

2、执行python-flink脚本

从kafka的demo获取消息,并将其中的a字段存入kafka的test_kafka_topic内,并打印sum(b)的值

from pyflink.table import TableEnvironment, EnvironmentSettingsdef log_processing():# 创建流处理环境env_settings = EnvironmentSettings.in_streaming_mode()t_env = TableEnvironment.create(env_settings)# 设置 Kafka 连接器 JAR 文件的路径# 确保 JAR 文件确实存在于指定路径,并且与 Flink 版本兼容t_env.get_config().get_configuration().set_string("pipeline.jars", "file:///home/data/flink/flink-1.20.0/lib/flink-sql-connector-kafka-3.3.0-1.20.jar")# 定义源表 DDLsource_ddl = """CREATE TABLE source_table(a VARCHAR,b INT  -- 如果 b 字段不重要,可以考虑从源表中移除它) WITH ('connector' = 'kafka','topic' = 'demo','properties.bootstrap.servers' = '192.168.15.130:9092','properties.group.id' = 'test_3','scan.startup.mode' = 'latest-offset','format' = 'json')"""# 定义目标表 DDLsink_ddl = """CREATE TABLE sink_table(a VARCHAR) WITH ('connector' = 'kafka','topic' = 'test_kafka_topic','properties.bootstrap.servers' = '192.168.15.130:9092','format' = 'json')"""# 执行 DDL 语句创建表t_env.execute_sql(source_ddl)#table = t_env.from_path("sql_source")#table.execute().print()table_result  = t_env.execute_sql("select sum(b) sb from source_table")table_result.print()t_env.execute_sql(sink_ddl)# 执行 SQL 查询并将结果插入到目标表# 注意:wait() 方法会阻塞,直到插入操作完成(在流处理中通常是无限的)t_env.sql_query("SELECT a FROM source_table") \.execute_insert("sink_table").wait()  # 考虑是否真的需要 wait()if __name__ == '__main__':log_processing()
python3 KafkaSource.py

3、启动kafka生产者

/usr/local/kafka_2.12-3.1.1/bin/kafka-console-producer.sh --broker-list 192.168.15.130:9092 --topic demo
输入模拟数据进行测试
>{"a": "example_string_1672531199", "b": 42}
>{"a": "example_string_1672531199", "b": 42}
>{"a": "example_string_1672531199", "b": 4}
>{"a": "example_string_1672531199", "b": 4}
>{"a": "example_string_1672531199", "b": 4}

可以看到sum(b)值已输出
在这里插入图片描述

4、启动kafka消费者

查看往test_kafka_topic插入的a字段数据已被消费

/usr/local/kafka_2.12-3.1.1/bin/kafka-console-consumer.sh --bootstrap-server 192.168.15.130:9092  --from-beginning --topic test_kafka_topic

在这里插入图片描述

相关文章:

pyfink1.20版本下实现消费kafka中数据并实时计算

1、环境 JDK版本:1.8.0_412python版本:3.10.6apache-flink版本:1.20.0flink版本:1.20kafka版本:kafka_2.12-3.1.1flink-sql-connector-kafka版本:3.3.0-1.202、执行python-flink脚本 从kafka的demo获取消…...

【经验分享】私有云运维的知识点

最近忙于备考没关注,有次点进某小黄鱼发现首页出现了我的笔记还被人收费了 虽然我也卖了一些资源,但我以交流、交换为主,笔记都是免费给别人看的 由于当时刚刚接触写的并不成熟,为了避免更多人花没必要的钱,所以决定公…...

影像组学+病理组学+深度学习人工智能应用

影像组学 基础学习内容: 特征提取:使用pyradiomics进行形状、纹理、小波变换等特征提取。特征筛选:应用ICC、相关系数、mRMR、Lasso等方法。建模:使用LR、SVM、RF、XGBoost、LightGBM等机器学习算法。模型评估:通过A…...

how to write 述职pptx as a tech manager

As a technical manager, crafting an effective 述职 (performance review) PPT requires you to highlight your leadership, team accomplishments, technical contributions, challenges faced, and future plans. Heres a structured approach to design your PPT: 1. Cov…...

用户发送请求后服务端i/o工作过程

华子目录 服务端i/o介绍磁盘i/o机械磁盘的寻道时间、旋转延迟和数据传输时间常见的机械磁盘平均寻道时间值常见磁盘的平均延迟时间每秒最大IOPS的计算方法 网络i/o网络I/O处理过程磁盘和网络i/o 一次完整的请求在内部的执行过程 服务端i/o介绍 i/o在计算机中指Input/Output&am…...

功能篇:springboot实现防盗链功能

防盗链(Hotlink Protection)是一种防止其他网站直接链接到你网站的资源(如图片、视频等),从而节省带宽和保护内容的有效手段。在Spring Boot应用程序中实现防盗链功能,可以通过多种方式来达成,例…...

MySQL迁移SQLite

将 MySQL 的表结构和数据迁移到 SQLite,可以通过以下步骤实现。这个过程主要包括导出 MySQL 数据库到 SQL 文件,然后将其导入到 SQLite 数据库中。 步骤 1: 导出 MySQL 数据库 首先,需要将 MySQL 数据库导出为一个 SQL 文件。可以使用 mysq…...

嵌入式面试知识点总结 -- 面试篇

1、请你做个简单的自我介绍 把所有工作内容,分类整理出和岗位匹配的能力关键字,然后围绕关键字展开讲。每段经历要用数据来支撑。 例如: 面试官你好,我叫XXX,毕业于XXX,很荣幸参加此次面试。 围绕面试岗位…...

华为OD机试真题---观看文艺汇演问题

华为OD机试中的“观看文艺汇演问题”是一道考察算法与数据结构能力的题目。以下是对该题目的详细解析: 一、题目描述 为了庆祝某个重要节日(如中国共产党成立100周年),某公园将举行多场文艺表演。很多演出都是同时进行的&#x…...

类OCSP靶场-Kioptrix系列-Kioptrix Level 2

一、前情提要 二、实战打靶 1. 信息收集 1.1. 主机发现 1.2. 端口扫描 1.3.目录遍历 2.漏洞发现 2.1. 登录框测试 2.2. 发现命令执行 2.3 构造命令执行利用payload 3.提权 3.1. 搜索提权exp 3.2. 查看exp信息 3.3. Privilege Escalation的exp利用 exp_9542 一、前…...

openlane

openlane数据集,lane3d_1000里训练集157807张图片,测试集39981张图,md太多了...

修改vscode设置的原理

转载请标明出处:小帆的帆的专栏 修改vscode设置 首先需要理解的是,vscode的系统设置和插件设置都是通过settings.json文件管理的。 vscode中有三个Settings,三个Settings分别对应三个settings.json文件 Default Settings:默认…...

解决docker环境下aspose-words转换word成pdf后乱码问题

描述 环境&#xff1a;docker 部署工具&#xff1a;Jenkins 需求&#xff1a;本地上传的word文档需要转换成pdf 问题&#xff1a;转换之后的pdf文档出现小框框&#xff08;乱码&#xff09; 转换成PDF的操作 pom&#xff1a; <dependency><groupId>org.apach…...

2024年12月16日Github流行趋势

项目名称&#xff1a;PDFMathTranslate 项目维护者&#xff1a;Byaidu reycn hellofinch Wybxc YadominJinta项目介绍&#xff1a;基于 AI 完整保留排版的 PDF 文档全文双语翻译&#xff0c;支持 Google/DeepL/Ollama/OpenAI 等服务&#xff0c;提供 CLI/GUI/Docker。项目star数…...

ElasticSearch 常见故障解析与修复秘籍

文章目录 一、ElasticSearch启动服务提示无法使用root用户二、ElasticSearch启动提示进程可拥有的虚拟内存少三、ElasticSearch提示用户拥有的可创建文件描述符太少四、ElasticSearch集群yellow状态分析五、ElasticSearch节点磁盘使用率过高&#xff0c;read_only状态问题解决六…...

用 Python Turtle 绘制经典汤姆猫:重温卡通角色的经典魅力

用 Python Turtle 绘制经典汤姆猫&#xff1a;重温卡通角色的经典魅力 &#x1f438; 前言 &#x1f438;&#x1f41e;往期绘画>>点击进所有绘画&#x1f41e;&#x1f40b; 效果图 &#x1f40b;&#x1f409; 代码 &#x1f409; &#x1f438; 前言 &#x1f438; 汤…...

数据结构Day4: 链表函数封装 ; 思维导图

目录 作业&#xff1a;实现链表剩下的操作&#xff1a; 任意位置删除 按位置修改 按值查找返回地址 反转 销毁 运行结果 思维导图 作业&#xff1a;实现链表剩下的操作&#xff1a; 1>任意位置删除 2>按位置修改 3>按值查找返回地址 4>反转 5>销毁 任意…...

用 Python Turtle 绘制一只可爱的小狗:用代码捕捉狗狗的萌态

用 Python Turtle 绘制一只可爱的小狗&#xff1a;用代码捕捉狗狗的萌态 &#x1f438; 前言 &#x1f438;&#x1f41e;往期绘画>>点击进所有绘画&#x1f41e;&#x1f40b; 效果图 &#x1f40b;&#x1f409; 代码 &#x1f409; &#x1f438; 前言 &#x1f438; …...

人工智能浪潮来袭:2024年技术革命与产业变革深度解析@附64页PDF文件下载

随着2024年的到来&#xff0c;人工智能&#xff08;AI&#xff09;技术正以前所未有的速度、广度和深度改变着我们的生产和生活方式。在这篇深度解析中&#xff0c;我们将带您一探AI技术的最新发展、产业应用的现状以及未来的安全治理趋势。 技术革命&#xff1a;AI技术的新范…...

python 下载 b站视频 和音频

video_bvid&#xff1a; import os import requests import json import re from bs4 import BeautifulSoup import subprocess # from detail_video import video_bvid# video_bvid 是一个从外部得到的单个视频ID video_bvid BV1cx421Q7veclass BilibiliVideoAudio:def __in…...

【蓝桥杯选拔赛真题93】Scratch青蛙过河 第十五届蓝桥杯scratch图形化编程 少儿编程创意编程选拔赛真题解析

目录 Scratch青蛙过河 一、题目要求 编程实现 二、案例分析 1、角色分析 2、背景分析 3、前期准备 三、解题思路 1、思路分析 2、详细过程 四、程序编写 五、考点分析 六、推荐资料 1、入门基础 2、蓝桥杯比赛 3、考级资料 4、视频课程 5、python资料 Scr…...

flink sink kafka的事务提交现象猜想

现象 查看flink源码时 sink kafka有事务提交机制&#xff0c;查看源码发现是使用两阶段提交策略&#xff0c;而事务提交是checkpoint完成后才执行&#xff0c;那么如果checkpoint设置间隔时间比较长时&#xff0c;事务未提交之前&#xff0c;后端应该消费不到数据&#xff0c…...

Oracle 临时表空间管理与最佳实践

Oracle 临时表空间管理与最佳实践 内容摘要 本文深入探讨了Oracle数据库中临时表空间的管理和最佳实践。主要内容包括&#xff1a; 临时表空间的概述及其在Oracle 19c多租户架构中的特点临时表空间组的优势及其创建方法非临时表空间组的临时表空间日常维护操作命令临时表空间…...

Java转C之继承和多态

在C/C中&#xff0c;继承和多态是面向对象编程&#xff08;OOP&#xff09;的两个重要特性。以下将详细讲解C/C中如何实现继承与多态&#xff0c;同时结合Java的对比&#xff0c;帮助理解两者的异同。 继承的实现 C/C中的继承 继承允许一个类&#xff08;派生类/子类&#xf…...

【密码学】ZUC祖冲之算法

一、ZUC算法简介 ZUC算法&#xff08;祖冲之算法&#xff09;是中国自主研发的一种流密码算法&#xff0c;2011年被3GPP批准成为4G国际标准&#xff0c;主要用于无线通信的加密和完整性保护。ZUC算法在逻辑上采用三层结构设计&#xff0c;包括线性反馈移位寄存器&#xff08;L…...

MacOS系统 快速安装appium 步骤详解

在macOS系统上&#xff0c;你可以通过使用nvm&#xff08;Node Version Manager&#xff09;来管理Node.js的版本&#xff0c;并基于nvm安装的Node.js环境来快捷地安装Appium。以下是具体步骤&#xff1a; 一、安装nvm 下载nvm 访问nvm的GitHub仓库&#xff08;nvm GitHub&…...

SEGGER | 基于STM32F405 + Keil - RTT组件07 - J-Scope数据可视化,RTT方式 + DWT定时器时间戳

导言 在上一章节SEGGER | 基于STM32F405 Keil - RTT组件06 - J-Scope数据可视化&#xff0c;使用RTT方式的第4.3章节提到&#xff0c;如果消息包不包含时间戳的话&#xff0c;那么J-Scope的横坐标的单位时间默认是100us&#xff0c;说白了时间戳是假的。会导致如下问题&#x…...

机器学习支持向量机(SVM)算法

一、引言 在当今数据驱动的时代&#xff0c;机器学习算法在各个领域发挥着至关重要的作用。支持向量机&#xff08;Support Vector Machine&#xff0c;SVM&#xff09;作为一种强大的监督学习算法&#xff0c;以其在分类和回归任务中的卓越性能而备受瞩目。SVM 具有良好的泛化…...

浏览器端的 js 包括哪几个部分

一、核心语言部分 1. 变量与数据类型 变量用于存储数据&#xff0c;在 JavaScript 中有多种数据类型&#xff0c;如基本数据类型&#xff08;字符串、数字、布尔值、undefined、null&#xff09;和引用数据类型&#xff08;对象、数组、函数&#xff09;。 let name "…...

【含开题报告+文档+PPT+源码】基于SpringBoot的开放实验管理平台设计与实现

开题报告 设计开放实验管理平台的目的在于促进科学研究与教学的融合。传统实验室常常局限于特定地点和时间&#xff0c;而开放平台可以为学生、教师和研究人员提供一个便捷的交流与共享环境。通过在线平台&#xff0c;他们可以分享实验资源、交流经验&#xff0c;从而促进科学…...

国内可以访问的github地址

国内的IP直接访问github.com官网一般会出现无法访问或者卡顿问题&#xff0c;可以尝试访问下面的国内的代理网站&#xff1a; GitHub Build and ship software on a single, collaborative platform GitHub...

Spring 框架事务管理深度剖析

1.Spring框架的事务管理有哪些优点 pring框架的事务管理具有以下优点&#xff1a; 声明式事务管理&#xff1a;Spring支持声明式事务管理&#xff0c;这使得开发者可以通过配置而不是编程方式来定义事务边界。这种方式简化了事务管理代码&#xff0c;并且可以减少出错的机会。…...

6.1 初探MapReduce

MapReduce是一种分布式计算框架&#xff0c;用于处理大规模数据集。其核心思想是“分而治之”&#xff0c;通过Map阶段将任务分解为多个简单任务并行处理&#xff0c;然后在Reduce阶段汇总结果。MapReduce编程模型包括Map和Reduce两个阶段&#xff0c;数据来源和结果存储通常在…...

SpringBoot - 动态端口切换黑魔法

文章目录 关键技术点核心原理Code 关键技术点 利用 Spring Boot 内嵌 Servlet 容器 和 动态端口切换 的方式实现平滑更新的方案&#xff0c;关键技术点如下&#xff1a; Servlet 容器重新绑定端口&#xff1a;Spring Boot 使用 ServletWebServerFactory 动态设置新端口。零停…...

【Excel】单元格分列

目录 分列&#xff08;新手友好&#xff09; 1. 选中需要分列的单元格后&#xff0c;选择 【数据】选项卡下的【分列】功能。 2. 按照分列向导提示选择适合的分列方式。 3. 分好就是这个样子 智能分列&#xff08;进阶&#xff09; 高级分列 Tips&#xff1a; 新手推荐基…...

Scratch教学作品 | 3D圆柱体俄罗斯方块——旋转视角的全新挑战! ✨

今天为大家推荐一款创意十足的Scratch益智游戏——《3D圆柱体俄罗斯方块》&#xff01;由Ceratophrys制作&#xff0c;这款作品将经典俄罗斯方块与立体圆柱舞台相结合&#xff0c;为玩家带来了前所未有的空间挑战与乐趣。更棒的是&#xff0c;这款游戏的源码可以在小虎鲸Scratc…...

智慧商城:登录页静态布局,axios请求数据切换图形验证

登录页静态布局 在src目录下新建 styles&#xff0c;主要用于 存放公共样式。在该文件夹下新建common.less文件&#xff0c;并将其在main.js中引入 将图片拷贝到src文件夹下的 assets文件夹下 完成静态布局 点击左箭头能返回到首页 所有组件头部返回左箭头颜色都是一样的&#…...

HTML知识点详解教程

文章目录 HTML知识点详解教程1. HTML基本语法2. HTML标签详解2.1 分区标签 <div>2.2 标题标签 <h1> ~ <h6>2.3 段落标签 <p>2.4 图片标签 <img>2.5 列表标签 <ul> 和 <ol>无序列表 <ul>有序列表 <ol> 2.6 超链接标签 &l…...

知识分享第二十八天-数学篇一

组合.二项式定理.常见导数 组合 让我们通过一个具体的例子来理解组合&#xff08;Combinations&#xff09;的概念 假设你有一个装有5个不同颜色球的袋子&#xff1a;红、蓝、绿、黄和紫。你想从中随机抽取3个球&#xff0c; 不考虑顺序&#xff0c;那么你可以有多少种不同的…...

搭建Tomcat(四)---Servlet容器

目录 引入 Servlet容器 一、优化MyTomcat ①先将MyTomcat的main函数搬过来&#xff1a; ②将getClass()函数搬过来 ③创建容器 ④连接ServletConfigMapping和MyTomcat 连接&#xff1a; ⑤完整的ServletConfigMapping和MyTomcat方法&#xff1a; a.ServletConfigMappin…...

P1029 [NOIP2001 普及组] 最大公约数和最小公倍数问题

题目描述 输入两个正整数 &#x1d465;0,&#x1d466;0&#xff0c;求出满足下列条件的 &#x1d443;,&#x1d444; 的个数&#xff1a; &#x1d443;,&#x1d444;是正整数。 要求 &#x1d443;,&#x1d444; 以 &#x1d465;0为最大公约数&#xff0c;以 &#x1…...

【泛微系统】自定义报表查看权限

自定义报表查询权限 前言:流程自定义报表,可查看每个报表都有哪些人有权限 --SQLserver写法 select a.id,a.workflowname,自定义报表权限 type,b.reportname,c.typename...

NPM国内镜像源多选择与镜像快速切换工具(nrm)介绍

多镜像源选择 淘宝镜像&#xff08;推荐&#xff09; 镜像地址&#xff1a;https://registry.npmmirror.com 特性&#xff1a;官方推荐&#xff0c;镜像更新速度快&#xff0c;稳定性高。 使用方式&#xff1a; npm config set registry https://registry.npmmirror.com恢复…...

详解负载均衡

什么是负载均衡&#xff1f; 想象一下&#xff0c;你有一家餐厅&#xff0c;当有很多客人同时到来时&#xff0c;如果只有一名服务员接待&#xff0c;可能会导致服务变慢。为了解决这个问题&#xff0c;你可以增加更多的服务员来分担工作&#xff0c;这样每位服务员就可以更快…...

AngularJS 与 SQL 的集成应用

AngularJS 与 SQL 的集成应用 引言 在当今的Web开发领域,AngularJS 和 SQL 是两种非常重要的技术。AngularJS,作为一个强大的前端框架,能够帮助开发者构建复杂且高性能的客户端应用。而SQL(Structured Query Language),作为一种广泛使用的数据库查询语言,是管理关系型…...

ANOMALY BERT 解读

出处&#xff1a; ICLR workshop 2023 代码&#xff1a;Jhryu30/AnomalyBERT 可视化效果&#xff1a; 一 提出动机 动机&#xff1a;无监督 TSAD 领域内&#xff0c;“训练集” 也缺失&#xff1a;真值标签&#xff08;GT&#xff09;&#xff1b;换句话说&#xff0c;一个…...

51c视觉~YOLO~合集6~

我自己的原文哦~ https://blog.51cto.com/whaosoft/12830685 一、其他yolo 1.1 Spiking-YOLO​ 使用常规深度神经网络到脉冲神经网络转换方法应用于脉冲神经网络域时&#xff0c;性能下降的很多&#xff0c;深入分析后提出了可能的解释&#xff1a;一是来自逐层归一化的效率…...

软考高级架构 —— 10.6 大型网站系统架构演化实例 + 软件架构维护

10.6 大型网站系统架构演化实例 大型网站的技术挑战主要来自于庞大的用户&#xff0c;高并发的访问和海量的数据&#xff0c;主要解决这类问题。 1. 单体架构 特点: 所有资源&#xff08;应用程序、数据库、文件&#xff09;集中在一台服务器上。适用场景: 小型网站&am…...

两数之和(Hash表)

优质博文&#xff1a;IT-BLOG-CN 一、题目 给定一个整数数组nums和一个整数目标值target&#xff0c;请你在该数组中找出"和"为目标值target的那两个整数&#xff0c;并返回它们的数组下标。 你可以假设每种输入只会对应一个答案。但是&#xff0c;数组中同一个元…...

【问题定位记录】哪些情况可能造成403

起因 403是我们平时在http请求中常见的一种错误码&#xff0c;如果有一天别人问你什么情况下可能造成403&#xff0c;我想大家都能想到的一种就是权限问题&#xff0c;比如鉴权失败会造成403。 但实际上不止这一种原因可能造成403&#xff0c;还有一种可能的原因今天就被我遇…...