Flink的数据流图中的数据通道 StreamEdge 详解
本文从基础原理到代码层面逐步解释 Flink 的数据通道 StreamEdge
,尽量让初学者也能理解。
主要思路:从概念开始,逐步深入到实现细节,并结合伪代码来逐步推导。
第一步:什么是 StreamEdge?
StreamEdge
是 Flink 中用于表示数据流图(Stream Graph)中两个算子(Operator)之间数据传输关系的对象。简单来说,它是一条“数据通道”,定义了数据如何从一个算子流向下游的另一个算子。想象成一条水管,上游的水(数据)通过这条管子流到下游。
- 为什么需要 StreamEdge?
Flink 是一个分布式流处理框架,数据需要在不同的任务(Task)之间传递。StreamEdge
负责描述这种传递的逻辑,包括“从哪里来”(源算子)、“到哪里去”(目标算子)、“数据如何分区”(Partitioning)等。
第二步:数据流图的基础
在 Flink 中,用户定义的程序(比如“读取数据 -> 过滤 -> 聚合”)会被转换成一个逻辑执行计划,叫做 StreamGraph
。这个图由以下部分组成:
- StreamNode:表示算子(比如 map、filter 等操作)。
- StreamEdge:表示算子之间的连接。
例如:
输入数据 -> map 操作 -> sink 操作
在 StreamGraph
中:
- 有 3 个
StreamNode
:输入源(source)、map、sink。 - 有 2 个
StreamEdge
:输入源(source)到 map,map 到 sink。
StreamEdge
的作用是连接这些节点,并定义数据传输的规则。
第三步:StreamEdge 的核心属性
StreamEdge
是一个 Java 类,我们来看看它的主要属性(基于 Flink 源代码,比如 org.apache.flink.streaming.api.graph.StreamEdge
):
- 源节点 ID(sourceId)
表示数据从哪个算子发出。 - 目标节点 ID(targetId)
表示数据流向哪个算子。 - 分区策略(Partitioning)
定义数据如何分配到下游。比如:FORWARD
:直接发给下游的同一个任务。HASH
:根据键(key)哈希分配。BROADCAST
:发送到下游所有任务。
- 数据类型(OutputTag)
如果有侧输出(side output),会标记数据的类型。
用生活中的例子比喻:
- 源节点和目标节点像是寄信的“发件人”和“收件人”。
- 分区策略像是“快递分拣规则”(按地址分、全部发、随机发等)。
第四步:StreamEdge 的创建过程
让我们一步步推导 StreamEdge
如何在 Flink 中生成:
1. 用户代码
假设写了一个简单的 Flink 程序:
DataStream<String> input = env.fromElements("a", "b", "c");
DataStream<String> mapped = input.map(x -> x.toUpperCase());
mapped.print();
这里有 3 个操作:输入源 -> map -> print。
2. 构建 StreamGraph
Flink 的 StreamGraphGenerator
会把这个程序翻译成图:
- 创建
StreamNode
:为每个算子分配一个 ID。 - 创建
StreamEdge
:连接这些节点。
伪代码(简化的生成过程):
StreamNode sourceNode = new StreamNode(1, "Source");
StreamNode mapNode = new StreamNode(2, "Map");
StreamNode sinkNode = new StreamNode(3, "Sink");StreamEdge edge1 = new StreamEdge(sourceNode, mapNode, Partitioning.FORWARD);
StreamEdge edge2 = new StreamEdge(mapNode, sinkNode, Partitioning.FORWARD);
3. 分区策略的推导
- 从
source
到map
,数据是顺序传递的,所以用FORWARD
。 - 从
map
到sink
,也是直接输出,所以也是FORWARD
。
如果用户加了 keyBy
(比如按某个字段分组),分区策略会变成 HASH
,数据会根据键的哈希值分配。
第五步:底层实现与数据传输
StreamEdge
只是逻辑表示,实际数据传输发生在运行时,由 StreamTask
和 RecordWriter
完成。
-
序列化与发送
- 数据被序列化(比如 "A" 变成字节数组)。
RecordWriter
根据StreamEdge
的分区策略决定发送目标。
-
网络传输
- 如果下游在另一台机器上,数据通过网络发送(基于 Netty)。
- 分区策略决定数据发到哪个子任务(Subtask)。
-
反序列化与处理
- 下游任务收到数据,反序列化后交给目标算子处理。
用伪代码表示:
class RecordWriter {void emit(Record record, StreamEdge edge) {if (edge.partitioning == FORWARD) {sendToLocalTask(edge.targetId, record);} else if (edge.partitioning == HASH) {int targetSubtask = hash(record.key) % numSubtasks;sendToRemoteTask(edge.targetId, targetSubtask, record);}}
}
第六步:从代码层面看 StreamEdge
以下是 StreamEdge
类的简化版(基于 Flink 1.18 左右的源码):
public class StreamEdge {private final int sourceId; // 源节点 IDprivate final int targetId; // 目标节点 IDprivate final StreamPartitioner<?> partitioner; // 分区策略private final OutputTag outputTag; // 侧输出标记(可选)public StreamEdge(StreamNode source, StreamNode target, StreamPartitioner<?> partitioner) {this.sourceId = source.getId();this.targetId = target.getId();this.partitioner = partitioner;this.outputTag = null; // 默认无侧输出}public int getSourceId() { return sourceId; }public int getTargetId() { return targetId; }public StreamPartitioner<?> getPartitioner() { return partitioner; }
}
StreamPartitioner
是一个接口,定义了具体分区逻辑(比如ForwardPartitioner
、KeyGroupStreamPartitioner
)。
第七步:完整推导示例
假设输入是 {("key1", 1), ("key2", 2)}
,经过 keyBy(key)
和 sum(value)
:
-
StreamGraph:
StreamNode1
:SourceStreamNode2
:KeyBy + SumStreamEdge
:Source -> KeyBy,分区策略为HASH
。
-
数据流:
- ("key1", 1) 的哈希值算出目标子任务 0。
- ("key2", 2) 的哈希值算出目标子任务 1。
RecordWriter
根据StreamEdge
的HASH
策略发送。
-
结果:
- 子任务 0 计算 "key1" 的和。
- 子任务 1 计算 "key2" 的和。
总结
StreamEdge
是 Flink 数据流处理的核心桥梁:
- 逻辑层面:定义算子间的数据流向和分区规则。
- 运行时层面:指导数据如何在分布式环境中传输。
- 代码层面:通过属性和分区器实现灵活的分发。
通过上面的逐步推导,从“水管”比喻到代码实现,希望你对 StreamEdge
的原理有了清晰的理解!
相关文章:
Flink的数据流图中的数据通道 StreamEdge 详解
本文从基础原理到代码层面逐步解释 Flink 的数据通道 StreamEdge,尽量让初学者也能理解。 主要思路:从概念开始,逐步深入到实现细节,并结合伪代码来逐步推导。 第一步:什么是 StreamEdge? StreamEdge 是 F…...
OpenCV 图形API(25)图像滤波-----均值滤波(模糊处理)函数blur()
操作系统:ubuntu22.04 OpenCV版本:OpenCV4.9 IDE:Visual Studio Code 编程语言:C11 算法描述 使用归一化的方框滤波器模糊图像。 该函数使用以下核来平滑图像: K 1 k s i z e . w i d t h k s i z e . h e i g h t [ 1 1 ⋯ …...
基于频率约束条件的最小惯量需求评估,包括频率变化率ROCOF约束和频率最低点约束matlab/simulink
基于频率约束条件的最小惯量评估,包括频率变化率ROCOF约束和频率最低点约束matlab/simulink 1建立了含新能源调频的频域仿真传函模型,虚拟惯量下垂控制 2基于构建的模型,考虑了不同调频系数,不同扰动情况下的系统最小惯量需求...
.pdf,.docx,.doc文档在一页纸上显示4页内容(详细步骤)
生活中常见一页纸上显示4页内容,我们熟知的是打印文件时,选择“每页4版”,但如果也是为了方便阅读,想要设置成一张纸上显示4页内容,又该怎么做呢?下面分享.docx和.pdf文档,一张纸上显示4页内容的…...
基于CNN-BiLSTM-GRU的深度Q网络(Deep Q-Network,DQN)求解移动机器人路径规划,MATLAB代码
一、深度Q网络(Deep Q-Network,DQN)介绍 1、背景与动机 深度Q网络(DQN)是深度强化学习领域的里程碑算法,由DeepMind于2013年提出。它首次在 Atari 2600 游戏上实现了超越人类的表现,解决了传统…...
高并发场景下的 Java 性能优化
在当今数字化时代,高并发场景已成为众多 Java 应用面临的常态,如电商大促、在线直播等活动期间,系统需承受巨大的访问压力。因此,Java 性能优化在高并发场景下显得尤为重要。传统的人工编写代码优化方式不仅耗时费力,还…...
Java 设计模式:观察者模式详解
Java 设计模式:观察者模式详解 观察者模式(Observer Pattern)是一种行为型设计模式,它定义了对象之间的一对多依赖关系,当一个对象的状态发生变化时,所有依赖它的对象都会收到通知并自动更新。这种模式广泛…...
Linux vagrant 导入Centos
前言 vagrant 导入centos 虚拟机 前提要求 安装 virtualbox 和vagrant<vagrant-disksize> (Linux 方式 Windows 方式) 创建一键部署centos 虚拟机 /opt/vagrant 安装目录/opt/VirtualBox 安装目录/opt/centos8/Vagrantfile (可配置网络IP,内存…...
linux Ubuntu 如何删除文件,错误删除后怎么办?
一、删除文件的常用方法 命令行删除 普通删除:rm 文件名 (示例:rm old_file.txt) 强制删除(无提示):rm -f 文件名 (示例:rm -f locked_file.txt) 删除目录…...
【前端】事件循环专题
引入 以下情况是为什么呢? //q1 for (var i 0; i < 3; i) {setTimeout(() > {console.log(i);}, 1000); } // console: // 3 // 3 // 3//q2 let name;setTimeout(() > {name name;console.log(name); }, 1000);if (name) {name newname;console.log(n…...
3DMAX笔记-UV知识点和烘焙步骤
1. 在展UV时,如何点击模型,就能选中所有这个模型的uv 2. 分多张UV时,不同的UV的可以设置为不同的颜色,然后可以通过颜色进行筛选。 3. 烘焙步骤 摆放完UV后,要另存为一份文件,留作备份 将模型部件全部分成…...
【深度学习】PyTorch实现VGG16模型及网络层数学原理
一、Demo概述 代码已附在文末 1.1 代码功能 ✅ 实现VGG16网络结构✅ 在CIFAR10数据集上训练分类模型 1.2 环境配置 详见【深度学习】Windows系统Anaconda CUDA cuDNN Pytorch环境配置 二、各网络层概念 2.1 卷积层(nn.Conv2d) nn.Conv2d(in_cha…...
Spring 事务
29.Spring管理事务的方式有几种? Spring中的事务分为编程式事务和声明式事务。 编程式事务是在代码中硬编码,通过 TransactionTemplate或者 TransactionManager 手动管理事务,事务范围过大会出现事务未提交导致超时,比较适合分布…...
GPT - TransformerDecoderBlock
本节代码定义了一个 TransformerDecoderBlock 类,它是 Transformer 架构中解码器的一个基本模块。这个模块包含了多头自注意力(Multi-Head Attention)、前馈网络(Feed-Forward Network, FFN)和层归一化(Lay…...
【C语言】预处理(预编译)(C语言完结篇)
一、预定义符号 前面我们学习了C语言的编译和链接。 在C语言中设置了一些预定义符号,其可以直接使用,预定义符号也是在预处理期间处理的。 如下: 可以看到上面的预定义符号,其都有两个短下划线,要注意的是ÿ…...
【Kubernetes】Kubernetes 如何进行日志管理?Fluentd / Loki / ELK 适用于什么场景?
由于 Kubernetes 运行在容器化的环境中,应用程序和系统日志通常分布在多个容器和节点上,传统的日志管理方法(例如直接访问每个节点的日志文件)在 Kubernetes 中不适用。 因此,Kubernetes 引入了集中式日志管理方案&am…...
从 SaaS 到 MCP:构建 AI Agent 生态的标准化服务升级之路
从 SaaS 到 MCP:构建 AI Agent 生态的标准化服务升级之路 —— 以数据连接器 dslink 的技术改造实践为例 引言:AI Agent 时代的 SaaS 服务范式转型 在生成式 AI 爆发式发展的 2025 年,AI Agent 已从概念验证走向企业级应用落地,…...
Linux 入门五:Makefile—— 从手动编译到工程自动化的蜕变
一、概述:Makefile—— 工程编译的 “智能指挥官” 1. 为什么需要 Makefile? 手动编译的痛点:当工程包含数十个源文件时,每次修改都需重复输入冗长的编译命令(如gcc file1.c file2.c -o app),…...
CST入门教程:如何从SYZ参数提取电容C和电感L --- 双端口
上期解释了单端口计算S参数,然后后处理很容易提取L或C,已经满足基本需求。 这期我们看复杂一点的情况,电路中放两个端口,比如S2P: 或集总电路: 或导入SPICE: 两个端口的Y和Z参数就是四个量了,Y…...
桌面版本及服务器版本怎么查看网络源软件包的url下载路径
服务器版本: ### 利用yumdownloader工具 - 首先安装yum-utils软件包,它包含yumdownloader工具。执行命令: bash yum install yum-utils - 安装完成后,使用yumdownloader --urls <package_name>命令来获取软件包的下载UR…...
汽车零部件产线节能提效,工业网关解锁数据采集 “密码”
在汽车零部件生产领域,高效的生产监控与精准的数据采集至关重要。工业网关作为智能工厂的关键枢纽,正发挥着不可替代的作用,助力产线实现电表等多种仪表数据的采集与高效监控。 背景简析 汽车零部件产线涉及众多设备与环节,各类电…...
量化策略分类、优劣势及对抗风险解析
一、常见量化策略分类及优劣势 1. 趋势跟踪策略(Trend Following) 原理:通过捕捉价格趋势(如均线突破、动量指标)进行交易。 代表模型:海龟交易法则、Dual Thrust。 优势: 在强趋势市场&am…...
Linux调试工具——gdb/cgdb
📝前言: 这篇文章我们来讲讲Linux调试工具——gdb/cgdb: 🎬个人简介:努力学习ing 📋个人专栏:Linux 🎀CSDN主页 愚润求学 🌄其他专栏:C学习笔记,C…...
SQLite + Redis = Redka
Redka 是一个基于 SQLite 实现的 Redis 替代产品,实现了 Redis 的核心功能,并且完全兼容 Redis API。它可以用于轻量级缓存、嵌入式系统、快速原型开发以及需要事务 ACID 特性的键值操作等场景。 功能特性 Redka 的主要特点包括: 使用 SQLi…...
使用 Terraform 部署 Azure landing zone
Azure 登陆区是架构完善的环境,遵循 Microsoft 针对 Azure 云架构的最佳实践。它们为团队运行工作负载提供了良好管理的基础,从而提供了可扩展性并促进了云的采用。 如果您有兴趣部署 Azure 登陆区,Terraform 是一个不错的选择。本教程概述的…...
【搭建博客网站】老旧笔记本“零成本逆袭”
写在前面:本博客仅作记录学习之用,部分图片来自网络,如需引用请注明出处,同时如有侵犯您的权益,请联系删除! 文章目录 前言博客网站搭建免费域名本地主机安装虚拟机安装宝塔及配置花生壳内网穿透 磁盘扩容 …...
XHR、FetchAxios详解网络相关大片文件上传下载
以下是 XHR(XMLHttpRequest) 与 Fetch API 的全面对比分析,涵盖语法、功能、兼容性等核心差异: 一、语法与代码风格 XHR(基于事件驱动) 需要手动管理请求状态(如 onreadystatechange 事件)和错误处理,代码冗长且易出现回调地狱。 const xhr = new XMLHttpRequest(); x…...
共享内存(与消息队列相似)
目录 共享内存概述 共享内存函数 (1)shmget函数 功能概述 函数原型 参数解释 返回值 示例 结果 (2)shmat函数 功能概述 函数原型 参数解释 返回值 (3)shmdt函数 功能概述 函数原型 参数解释…...
【3D开发SDK】HOOPS SDKS如何在BIM行业运用?
Tech Soft 3D提供了支持核心功能的软件开发工具,使开发人员可以使用Windows,Linux,OSX和移动平台等广泛的平台来构建巨大而复杂的建筑和BIM应用程序。HOOPS SDK支持多种格式的CAD导入和3D查看技术。这些技术受到了Trimble,RIB&…...
纳米软件矿用电源模块自动化测试方案分享
矿用电源模块主要是用于矿井等危险环境的一种电源系统,它可以为矿井中的仪器提供充足的电力支持。由于矿用电源经常用在危险环境中,因此对于矿用电源的稳定性要求极为严格。 纳米软件矿用电源模块自动化测试方案 测试需求分析 矿用电源模块作为矿井作业…...
pycharm中安装Charm-Crypto
一、安装依赖 1、安装gcc、make、perl sudo apt-get install gcc sudo apt-get install make sudo apt-get install perl #检查版本 gcc -v make -v perl -v 2、安装依赖库m4、flex、bison(如果前面安装过pypbc的话,应该已经装过这些包了) sudo apt-get update sudo apt…...
RTX30系显卡运行Tensorflow 1.15 GPU版本
30系显卡只支持cuda11.0及以上版本,但很多tensorflow项目用的仍然是1.1x版本,这些版本需要cuda10或者以下版本,这就导致在30系显卡上无法正常运1.1x版本的tensorflow,最近几天我也因为这个问题头疼不已,网上一番搜索…...
adb|scrcpy的安装和配置方法|手机投屏电脑|手机声音投电脑|adb连接模拟器或手机
adb|scrcpy的安装和配置方法手机投屏电脑|手机声音投电脑|adb连接模拟器或手机或电视 引言 在数字设备交织的现代生活中,adb(Android Debug Bridge)与 scrcpy 宛如隐匿的强大工具,极大地拓展了我们操控手机、模拟器乃至智能电视等…...
LangChain4j(2):Chat、流式与文生图模型功能
本文将探讨 LangChain4j 的聊天对话、流式对话以及文生图这三种常见且实用的功能,以及实际代码示例 一、聊天对话(ChatLanguageModel) 在 LangChain4j 中,使用ChatLanguageModel进行基本的聊天对话简单直观。以下是一段示例代码&a…...
Uniapp当中的async/await的作用
一、原始代码的行为(使用 async/await) const getUserMessagePlan async () > {// 等待两个异步操作完成const tabsList await message.getTagesList(); // 等待获取标签列表const tagsStateList await message.getTagsStateList(); // 等…...
JS包装类型Array
reduce()函数 没有起始值的执行过程 有初始值的执行过程 计算对象 是对象数组的情况 数组类型 方法...
Cursor + MCP让Blender实现自动建模
先决条件 Blender 3.0 或更新版本 Python 3.10 或更高版本 uv Blender安装 && 插件安装 下载Blender,版本最好是3.x以上的版本,选择适合自己的平台,地址:Download — blender.org 安装插件 从https://g…...
websocket深入-webflux+websocket
文章目录 背景版本约定配置文件代码使用webflux使用websocket配置文件handler基类实现类注册路由 背景 基于更复杂的情况和更高的开发要求,我们可能会遇到必须同时要使用webflux和websocket的情况。 版本约定 JDK21Springboot 3.2.0Fastjson2lombok 配置文件 &…...
LangChain-输出解析器 (Output Parsers)
输出解析器是LangChain的重要组件,用于将语言模型的原始文本输出转换为结构化数据。本文档详细介绍了输出解析器的类型、功能和最佳实践。 概述 语言模型通常输出自然语言文本,但在应用开发中,我们经常需要将这些文本转换为结构化的数据格式…...
wsl2+ubuntu22.04安装blenderproc教程
本章教程,介绍如何在windows操作系统上通过wsl2+Ubuntu22.04上安装blenderproc。 一、pipi安装方式 推荐使用minconda3安装Python环境。 pip install Blenderproc二、源码安装 1、下载源码 git clone https://github.com/DLR-RM/BlenderProc2、安装依赖 cd BlenderProc &am…...
矩阵热图】】
一、基础热图绘制 import matplotlib.pyplot as plt import numpy as np# 模拟数据生成 matching_history [np.random.randint(0, 2, (5, 3)) for _ in range(4)] # 5个UE,3个边缘服务器,4次迭代# 绘制最终匹配矩阵 plt.figure(figsize(10, 6)) plt.i…...
opencv人脸性别年龄检测
一、引言 在计算机视觉领域,人脸分析是一个热门且应用广泛的研究方向。其中,人脸性别年龄检测能够自动识别图像或视频流中人脸的性别和年龄信息,具有诸多实际应用场景,如市场调研、安防监控、用户个性化体验等。OpenCV 作为一个强…...
idea里面不能运行 node 命令 cmd 里面可以运行咋回事啊
idea里面不能运行 node 命令 cmd 里面可以运行咋回事啊 在 IntelliJ IDEA(或其他 JetBrains 系列 IDE)中无法运行某些命令,但在系统的命令提示符(CMD)中可以正常运行,这种情况通常是由于以下原因之一导致的…...
【ROS】软件包后期添加依赖
【ROS】软件包后期添加依赖 前言整体思路修改 package.xml1. 构建依赖(build_depend)2. 构建导出依赖(build_export_depend)3. 运行依赖(exec_depend)如何修改 修改 CMakeLists.txt修改 find_package其他修…...
十三届蓝桥杯Java省赛 B组(持续更新..)
目录 十三届蓝桥杯Java省赛 B组第一题:星期计算第二题:山第三题:字符统计第四题:最少刷题数第五题:求阶乘第六题:最大子矩阵第七题:数组切分第八题:回忆迷宫第九题:红绿灯…...
生成式人工智能的价值回归:重塑技术、社会与个体的发展轨迹
在数字化浪潮的席卷之下,生成式人工智能(Generative AI)正以前所未有的速度重塑人类社会的面貌。这项技术不仅被视为人工智能发展的新阶段,更被赋予了推动生产力跃升、加速社会形态变革的历史使命。生成式人工智能的价值回归,不仅体现在技术本身的革新与突破,更在于其对个…...
【工具开发教程】通过批量OCR识别PDF扫描件中的文本,给PDF批量重命名,基于WPF和阿里云的实现方案,超详细
以下是基于WPF和阿里云实现批量OCR识别PDF扫描件中的文本,并给PDF批量重命名的项目方案,包含项目背景、界面设计、代码步骤和开发总结。 一、项目背景 在日常办公或学习中,处理大量PDF扫描件时,常常需要手动提取文件中的文本内容并重命名文件。这种方式效率低下且容易出错…...
AI 重构 Java 遗留系统:从静态方法到 Spring Bean 注入的自动化升级
在当今快速发展的软件行业中,许多企业都面临着 Java 遗留系统的维护和升级难题。这些老旧系统往往采用了大量静态方法,随着业务的不断发展,其局限性日益凸显。而飞算 JavaAI 作为一款强大的 AI 工具,为 Java 遗留系统的重构提供了…...
JS—同源策略:2分钟掌握同源策略
个人博客:haichenyi.com。感谢关注 一. 目录 一–目录二–什么是“同源”?三–同源策略的限制范围四–允许跨源的场景五–如何绕过同源策略(安全方式)六–同源策略的安全意义七–总结 二. 什么是“同源”? …...
【C++】关于scanf是否需要使用的快速记忆
在 C 语言中,scanf 函数用于从标准输入读取数据并存储到变量中。scanf 函数需要知道变量的内存地址,以便将输入的数据存储到正确的内存位置。这就是为什么在大多数情况下需要使用 & 符号的原因。 1. 为什么需要使用& & 符号用于获取变量的内…...