FlinkUDF用户自定义函数深度剖析
Flink 作为一款强大的流批一体数据处理引擎,其灵活性和扩展性在很大程度上依赖于用户自定义函数(User-Defined Functions, UDF)。UDF 允许开发者根据业务需求扩展 Flink 的核心功能,实现复杂的数据转换、聚合或分析。本文将系统性地讲解 Flink UDF 的类型、实现方式、优化技巧及实际应用场景,涵盖从基础到高级的完整知识体系。
一、UDF 概述
1.1 什么是 UDF?
UDF 是用户根据业务逻辑自定义的函数,用于在数据处理过程中执行特定的操作。Flink 支持多种类型的 UDF,包括标量函数(ScalarFunction)、表函数(TableFunction)、聚合函数(AggregateFunction)等,覆盖了从单行数据转换到多行数据生成、分组聚合等多种场景。
1.2 为什么需要 UDF?
- 灵活性:处理复杂业务逻辑(如自定义加密、数据清洗)。
- 性能优化:通过代码优化替代低效的 SQL 操作。
- 复用性:封装通用逻辑,跨项目复用。
- 扩展性:集成外部服务(如调用机器学习模型)。
二、Flink UDF 类型与实现
2.1 标量函数(ScalarFunction)
定义:一对一转换,输入一行数据返回单个值。
典型场景:字符串处理、数值计算、类型转换。
实现步骤:
- 继承
org.apache.flink.table.functions.ScalarFunction
。 - 实现
eval()
方法,支持重载多个参数类型。 - 注册函数到 TableEnvironment。
示例(Java):
public class ToUpperCase extends ScalarFunction {public String eval(String input) {return input.toUpperCase();}
}// 注册并使用
tableEnv.createTemporaryFunction("to_upper", ToUpperCase.class);
tableEnv.sqlQuery("SELECT to_upper(name) FROM Users");
Scala 实现:
class ToUpperCase extends ScalarFunction {def eval(input: String): String = input.toUpperCase
}
2.2 表函数(TableFunction)
定义:一对多转换,输入一行数据返回多行结果(类似 SQL 的 LATERAL TABLE)。
典型场景:解析 JSON 数组、字符串拆分、行转列。
实现步骤:
- 继承
org.apache.flink.table.functions.TableFunction<T>
。 - 实现
eval()
方法,通过collect(T)
输出多行。 - 使用
CROSS JOIN LATERAL TABLE
或LEFT JOIN LATERAL TABLE
调用。
示例(拆分字符串):
public class SplitString extends TableFunction<String> {public void eval(String input, String delimiter) {for (String s : input.split(delimiter)) {collect(s);}}
}// SQL 调用
tableEnv.sqlQuery("SELECT name, word FROM Users, LATERAL TABLE(split_string(description, ' '))"
);
注意事项:
- 需指定输出类型
getResultType()
(或在 Scala 中使用注解@DataTypeHint
)。
2.3 聚合函数(AggregateFunction)
定义:多对一转换,基于一组数据计算聚合结果(如 SUM、COUNT)。
核心概念:
- 累加器(Accumulator):中间状态存储。
- createAccumulator():初始化累加器。
- accumulate():更新累加器。
- getValue():生成最终结果。
示例(自定义平均值):
public class CustomAvg extends AggregateFunction<Double, Tuple2<Long, Double>> {public Tuple2<Long, Double> createAccumulator() {return Tuple2.of(0L, 0.0);}public void accumulate(Tuple2<Long, Double> acc, Double value) {acc.f0 += 1;acc.f1 += value;}public Double getValue(Tuple2<Long, Double> acc) {return acc.f1 / acc.f0;}
}// 注册并使用
tableEnv.createTemporaryFunction("custom_avg", CustomAvg.class);
tableEnv.sqlQuery("SELECT custom_avg(salary) FROM Employees");
高级用法:
- 支持
retract()
(回撤数据)和merge()
(会话窗口合并)。
2.4 表聚合函数(TableAggregateFunction)
定义:多对多聚合,输出多行结果(如 Top N)。
实现方法:
- 继承
TableAggregateFunction<T, ACC>
。 - 使用
emitValue()
或emitUpdateWithRetract()
输出结果。
示例(求 Top 2 分数):
public class Top2 extends TableAggregateFunction<Tuple2<Double, Integer>, Tuple2<Double, Double>> {public void accumulate(Tuple2<Double, Double> acc, Double value) {if (value > acc.f0) {acc.f1 = acc.f0;acc.f0 = value;} else if (value > acc.f1) {acc.f1 = value;}}public void emitValue(Tuple2<Double, Double> acc, Collector<Tuple2<Double, Integer>> out) {out.collect(Tuple2.of(acc.f0, 1));out.collect(Tuple2.of(acc.f1, 2));}
}
调用方式:需使用 flatAggregate
(Table API)或自定义 SQL 解析。
三、高级 UDF 功能
3.1 异步 UDF(AsyncFunction)
应用场景:调用高延迟外部服务(如 HTTP API、数据库)时避免阻塞。
实现方法:
- 继承
AsyncFunction
,使用asyncInvoke()
结合CompletableFuture
。 - 配置异步执行环境(线程池大小、超时时间)。
示例(异步查询外部服务):
public class AsyncLookup extends AsyncFunction<String, String> {@Overridepublic void asyncInvoke(String key, ResultFuture<String> resultFuture) {CompletableFuture.supplyAsync(() -> externalService.query(key)).thenAccept(result -> resultFuture.complete(Collections.singleton(result)));}
}
优化建议:
- 使用缓存减少重复请求。
- 控制并发度防止资源耗尽。
3.2 向量化 UDF(Vectorized Function)
背景:在批处理中提升 CPU 缓存利用率,减少虚函数调用。
实现方法:
- 继承
VectorizedScalarFunction
,处理ColumnVector
对象。 - 启用参数
TableConfigOptions#SQL_EXEC_VECTORIZED_ENABLED
。
适用场景:高吞吐批处理作业。
四、UDF 性能优化
4.1 序列化优化
- 使用 Flink 类型系统(如
Types.POJO
)替代 Java 原生序列化。 - 避免在 UDF 中传递不可序列化对象。
4.2 类型推导优化
- 通过
@DataTypeHint
或@FunctionHint
显式指定输入/输出类型,避免反射开销。
4.3 资源管理
- 在
open()
方法中初始化资源(如数据库连接),在close()
中释放。 - 使用 Flink 托管内存(如
MemorySegment
)减少 GC 压力。
4.4 代码生成
- 复杂逻辑可考虑生成字节码(如 Apache Calcite 优化器)。
五、UDF 应用场景与案例
5.1 数据清洗
- 使用标量函数过滤非法字符、标准化日期格式。
public class SanitizeInput extends ScalarFunction {public String eval(String input) {return input.replaceAll("[^a-zA-Z0-9]", "");}
}
5.2 实时统计分析
- 聚合函数计算移动平均、滑动窗口 Top K。
- 结合窗口函数实现 TUMBLE、HOP 等复杂窗口逻辑。
5.3 复杂事件处理(CEP)
- 表函数解析日志事件流,生成异常模式序列。
5.4 机器学习集成
- 异步 UDF 调用 TensorFlow Serving 模型进行实时预测。
六、注意事项与最佳实践
6.1 状态管理
- 避免在 UDF 中维护可变状态,需使用 Flink 状态 API(如
ValueState
)。
6.2 线程安全
- 确保 UDF 的线程安全性(尤其异步函数中的资源访问)。
6.3 异常处理
- 捕获异常并通过
Collector
输出错误信息,避免作业失败。
6.4 测试策略
- 单元测试:验证 UDF 逻辑。
- 集成测试:验证在 Flink 集群中的行为。
相关文章:
FlinkUDF用户自定义函数深度剖析
Flink 作为一款强大的流批一体数据处理引擎,其灵活性和扩展性在很大程度上依赖于用户自定义函数(User-Defined Functions, UDF)。UDF 允许开发者根据业务需求扩展 Flink 的核心功能,实现复杂的数据转换、聚合或分析。本文将系统性…...
Python图形界面编程(一)
目录 一、相关的库 1、tkinter库 2、PyQt库 二、图形界面编程要点 三、tkinter控件 1、tkinter控件表 2、tkinter的常用控件 3、tkinter的扩展控件 四、tkinter布局 1、简单示例 2、默认情况下的grid规则 3、调整窗口和网格 (1)调整窗口 &…...
HarmonyOS Grid 网格列表可长按 item 拖动移动位置
方案一 @Component struct WorkCircleCreatePage {// 存储车控列表的数组@State VehicleDoorArr: IVehicleDoor[] = []// 当前移动的Item索引@State CurrentIndex: number = -1// 拖动时显示的数据@State MoveItem: IVehicleDoor = { title: , icon: }// 拖动时放大倍数@State…...
出现 ORA-00904: “TENANT_ID“: 标识符无效 解决方法
目录 前言1. 问题所示2. 原理分析3. 解决方法前言 🤟 找工作,来万码优才:👉 #小程序://万码优才/r6rqmzDaXpYkJZF 爬虫神器,无代码爬取,就来:bright.cn 1. 问题所示 执行代码的时候,出现如下所示: org.springframework.jdbc.BadSqlGrammarException:</...
榜单持久化
榜单持久化的基本流程是这样的: 创建表 持久化Redis数据到数据库 清理Redis数据 现在,创建表的动作已经完成,接下来就轮到Redis数据的持久化了。持久化的步骤如下: 读取Redis数据 判断数据是否存在 不存在,直接结束…...
璞华ChatBI闪耀2025数博会:对话式数据分析引领数智化转型新范式
4月17日至19日,2025中国(武汉)数字经济产业博览会在武汉盛大举办,璞华集团携自主研发的“ChatBI自然语言问答式数据分析平台”惊艳亮相。以 "通过对话让数据说话" 为主题,璞华集团在 A3-T8 展位构建了沉浸式…...
力扣DAY63-67 | 热100 | 二分:搜索插入位置、搜索二维矩阵、排序数组查找元素、搜索旋转排序数组、搜索最小值
前言 简单、中等 √ 二分法思路很简单,但是判断边界太麻烦了!难道真的要去背模板吗 搜索插入位置 我的题解 循环条件左不超过右,目标大于中间值(向下取整)时,左中1,小于,右中-1&…...
leetcode-哈希表
哈希表 127. 单词接龙 题目 字典 wordList 中从单词 beginWord 到 endWord 的 转换序列 是一个按下述规格形成的序列 beginWord -> s(1) -> s(2) -> ... -> s(k): 每一对相邻的单词只差一个字母。 对于 1 < i < k 时,每个 s(i) 都在…...
信息技术有限公司项目管理手册
这篇文档是信息技术有限公司的项目管理指导手册,对软件公司项目管理者具有重要价值,主要体现在以下几个方面: 管理全面规范 涵盖内容广:从项目的整体管理到各个具体领域,如范围管理、进度管理、成本管理等&…...
TFTP服务调试
在tftpboot目录下进行sudo minicom 启动内核时 问题:程序启动卡在Loading阶段 原因:tftp协议的问题 、或者网卡配置的问题 解决:1.检查网线是否插好 多试几次 2.检查tftp服务是否正常 在minicom中调试ping pc机的ip地址 2.进入boot调…...
date-picker组件的shortcuts为什么不能配置在vue的data的return中
在 Vue 中,shortcuts 是一个选项,通常用于配置像 date-picker 这样的组件的日期快捷方式。这里有一些原因解释为什么 shortcuts 不应该配置在 data 的 return 中,而是应该配置在 data 的外部(例如,直接作为组件的一个属…...
迭代器模式:统一数据遍历方式的设计模式
迭代器模式:统一数据遍历方式的设计模式 一、模式核心:将数据遍历逻辑与数据结构解耦 在软件开发中,不同的数据结构(如数组、链表、集合)有不同的遍历方式。如果客户端直接依赖这些数据结构的内部实现来遍历元素&…...
RocketMQ 核心架构速览
欢迎光临小站:致橡树 文章现有讲述比较简单,后续逐渐丰富各部分内容。 Apache RocketMQ 作为阿里巴巴开源的一款分布式消息中间件,凭借其高吞吐、低延迟、高可用等特性,成为金融级稳定性场景的首选解决方案。本文将深入剖析 Roc…...
kafka安装、spark安装
kafka简介 Kafka就是一个分布式的用于消息存储的消息队列。 kafka角色 Kafka中存储的消息,被消费后不会被删除,可以被重复消费,消息会保留多长,由kafka自己去配置。默认7天删除。背后的管理工作由zookeeper来管理。 kafka安装 …...
迅为RK3562开发板ARM四核A53核心板多种系统适配全开源
迅为RK3562开发板ARM四核A53核心板多种系统适配全开源 RK3562开发板(2GB内存16GB存储)...
用交换机连接两台电脑,电脑A读取/写电脑B的数据
1、第一步,打开控制面板中的网络和共享中心,如下图配置,电脑A和电脑B均要配置; 注意:要保证电脑A和电脑B在同一子网掩码下,不同的IP地址; 2、在电脑上同时按‘CommandR’,在弹出的输…...
线程入门3
synchronized修饰方法 synchronized可以修饰代码块(在线程入门2中有例子),也可以修饰普通方法和静态方法。 修饰普通方法 修饰普通方法简化写法: 修饰静态方法 修饰静态方法简化写法: 注意:利用synchronized上锁,锁的…...
【C++】AVL树
目录 一、AVL树的引入 二、AVL树 🍔AVL树的概念 🍟AVL树节点的定义 🌮AVL树的插入 🥪AVL树的旋转 三、AVL树的验证 四、结语 一、AVL树的引入 🌟我们知道 map/multimap/set/multiset 这几个容器的共同点是&#…...
Java大师成长计划之第1天:Java编程基础入门
📢 友情提示: 本文由银河易创AI(https://ai.eaigx.com)平台gpt-4o-mini模型辅助创作完成,旨在提供灵感参考与技术分享,文中关键数据、代码与结论建议通过官方渠道验证。 欢迎来到“Java大师成长计划”系列文…...
Java线程中断机制详解
中断机制是Java中一种协作式的线程停止方式,它提供了一种优雅的线程间通信机制,用于请求另一个线程停止当前工作。 中断机制的核心概念 中断标志位(Interrupt Status) 每个线程都有一个boolean类型的中断状态标志(native方法控制)…...
gem5-gpu教程06 回归测试
gem5-gpu包括gem5风格的回归测试,以避免常见错误,并在变更集之间保持模拟系统性能的一致性。如果你想为gem5-gpu做出贡献,你必须确保你的更改通过了包含的回归测试。 回归测试是一种软件测试类型,其主要目的是确保新代码的更改没有对现有功能造成影响。在软件开发过程中,当…...
查询Hologres或postgresql中的数据
因Hologres使用postgresql的语法.所以两者查询一样. 方案1: import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; import java.util.ArrayList; import java.util.List;/*** 一个使用简单连接池管理PostgreSQL连接的工具类。*/ publi…...
C# 文件读取
文件读取是指使用 C# 程序从计算机文件系统中获取文件内容的过程。将存储在磁盘上的文件内容加载到内存中,供程序处理。主要类型有:文本文件读取(如 .txt, .csv, .json, .xml);二进制文件读取(如 .jpg, .pn…...
On the Biology of a Large Language Model——Claude团队的模型理解文章【论文阅读笔记】其一CLT与LLM知识推理
这个学习笔记,是在精读Anthropic的博客 On the Biology of a Large Language Model 的过程中留下的笔记。 由于原文非常长,我会分2-3 个博客来写。 作者的思路 作者对常用的LLM特征解读工具 SAE/Transcoder 进行了优化,增加了跨层连接的能力…...
Postman忘记密码访问官网总是无响应
1.Header Editor插件下载 百度网盘下载: 链接:https://pan.baidu.com/s/1EV6cY7TYQVgPjip3v-vhfQ 提取码:yyds 2.插件配置 下载规则url:https://azurezeng.github.io/static/HE-GoogleRedirect.json 、U(向上)、L(向左)和 R(向右)组成的 48 字符描…...
深度学习中的黑暗角落:梯度消失与梯度爆炸问题解析
📌 友情提示: 本文内容由银河易创AI(https://ai.eaigx.com)创作平台的gpt-4o-mini模型生成,旨在提供技术参考与灵感启发。文中观点或代码示例需结合实际情况验证,建议读者通过官方文档或实践进一步确认其准…...
【数字图像处理】机器视觉(1)
判别相对应的点 1. 图像灰度化 2. 局部特征 3. 仿射不变性特征 图像变化的类型 【1】几何变化:旋转、相似(旋转 各向相同的尺度缩放)、仿射(非各向相同的尺度缩放) 【2】灰度变化:仿射灰度变化 角点 角…...
# 构建和训练一个简单的CBOW词嵌入模型
构建和训练一个简单的CBOW词嵌入模型 在自然语言处理(NLP)领域,词嵌入是一种将词汇映射到连续向量空间的技术,这些向量能够捕捉词汇之间的语义关系。在这篇文章中,我们将构建和训练一个简单的Continuous Bag of Words…...
Ubuntu20.04下GraspNet复现流程中的问题
pytorchcudacudnn的版本问题相对于GraspNet来说至关重要!!!至关重要!!!至关重要!!!(重要的事情说三边) 我的显卡是3070 那么首先说结论 使用30系…...
【ROS2】机器人操作系统安装到Ubuntu简介
主要参考: https://book.guyuehome.com/ROS2/1.系统架构/1.3_ROS2安装方法/ 官方文档:https://docs.ros.org/en/humble/Installation.html 虚拟机与ubuntu系统安装 略,见参考文档 ubutun换国内源,略 1. 设置本地语言 确保您有…...
从0到1掌握机器学习核心概念:用Python亲手构建你的第一个AI模型(超多代码+可视化)
🧠 一、开始 真正动手实现一个完整的AI项目!从数据预处理、特征工程、模型训练,到评估与调优,一步步还原你在动画视频中看到的所有核心知识点。 📦 二、环境准备 建议使用 Python 3.8,推荐工具࿱…...
Java面试题汇总
1王二哥 https://javabetter.cn/sidebar/sanfene/redis.html#_10-redis-%E6%8C%81%E4%B9%85%E5%8C%96%E6%96%B9%E5%BC%8F%E6%9C%89%E5%93%AA%E4%BA%9B-%E6%9C%89%E4%BB%80%E4%B9%88%E5%8C%BA%E5%88%AB 2.小林 https://www.xiaolincoding.com/redis/data_struct/command.html#…...
Ollama API 应用指南
1. 基础信息 默认地址: http://localhost:11434/api数据格式: application/json支持方法: POST(主要)、GET(部分接口) 2. 模型管理 API (1) 列出本地模型 端点: GET /api/tags功能: 获取已下载的模型列表。示例:curl http://lo…...
React SSR + Redux 导致的 Hydration 报错踩坑记录与修复方案
一条“Hydration failed”的错误,让我损失了半天时间 背景 我在用 Next.js App Router Redux 开发一个任务管理应用,一切顺利,直到打开了 SSR(服务端渲染),突然看到这个令人头皮发麻的报错: …...
【论文精读】Reformer:高效Transformer如何突破长序列处理瓶颈?
目录 一、引言:当Transformer遇到长序列瓶颈二、核心技术解析:从暴力计算到智能优化1. 局部敏感哈希注意力(LSH Attention):用“聚类筛选”替代“全量计算”关键步骤:数学优化: 2. 可逆残差网络…...
iOS18 MSSBrowse闪退
iOS18 MSSBrowse闪退 问题方案结果 问题 最近升级了电脑系统(15.4.1),并且也升级了xcode(16.3)开发工具。之后打包公司很早之前开发的项目。 上线之后发现在苹果手机系统18以上,出现了闪退问题。 涉及到的是第三方MSSBrowse,在选择图片放大的…...
create_function()漏洞利用
什么是 create_function() create_function() 是 PHP 早期提供的一个用来创建匿名函数的函数: $func create_function($a,$b, return $a $b;); echo $func(1, 2); // 输出 3 第一个参数是函数的参数列表(字符串形式),第二个参…...
leetcode-数组
数组 31. 下一个排列 题目 整数数组的一个 排列 就是将其所有成员以序列或线性顺序排列。 例如,arr [1,2,3] ,以下这些都可以视作 arr 的排列:[1,2,3]、[1,3,2]、[3,1,2]、[2,3,1] 。 整数数组的 下一个排列 是指其整数的下一个字典序更大…...
Tailwind CSS 实战:基于 Kooboo 构建个人博客页面
在现代 web 开发中,Tailwind CSS 作为一款实用优先的 CSS 框架,能让开发者迅速搭建出具有良好视觉效果的页面;Kooboo 则是一个强大的快速开发平台,提供了便捷的页面管理和数据处理功能。本文将详细介绍如何结合 Tailwind CSS 和 K…...
C#学习1_认识项目/程序结构
一、C#项目文件的构成 1.新建一个项目 2.运行项目 3.认识文件 1)解决方案(Solution):组织多个项目的容器 抽象理解:餐厅 解决方案.sln文件,点击即可进入VS编辑 2)项目(…...
边缘计算在工业自动化中的应用:开启智能制造新时代
在工业4.0的浪潮中,智能制造成为推动工业发展的核心驱动力。随着物联网(IoT)技术的广泛应用,工业设备之间的互联互通变得越来越紧密,但这也带来了数据处理和传输的挑战。边缘计算作为一种新兴技术,通过将计…...
《MySQL:MySQL表的内外连接》
表的连接分为内连接和外连接。 内连接 内连接实际上就是利用where子句对两种表形成的笛卡尔积进行筛选,之前的文章中所用的查询都是内连接,也是开发中使用的最多的连接查询。 select 字段 from 表1 inner join 表2 on 连接条件 and 其他条件࿱…...
人工智能催化民航业变革:五大应用案例
航空业正在经历一场前所未有的技术革命,人工智能正成为变革的主要催化剂。从停机坪到航站楼,从维修机库到客户服务中心,人工智能正在从根本上重塑航空公司的运营和服务方式。这种转变并非仅仅停留在理论上——全球主要航空公司已从人工智能投…...
机器视觉中有哪些常见的光学辅助元件及其作用?
在机器视觉领域,光学元件如透镜、反射镜和棱镜扮演着至关重要的角色。它们不仅是高精度图像捕获的基础,也是提升机器视觉系统性能的关键。深入了解这些光学元件的功能和应用,可以帮助我们更好地掌握机器视觉技术的精髓。 透镜:精…...
Stream API 对两个 List 进行去重操作
在 Java 8 及以上版本中,可以使用 Lambda 表达式和 Stream API 对两个 List 进行去重操作。以下是几种常见的去重场景及对应的 Lambda 表达式实现方式: 1. 合并两个 List 并去重 List<String> list1 Arrays.asList("A", "B"…...
lerna 8.x 详细教程
全局安装 lerna npm install lerna -g初始化项目 mkdir lerna-cli-do cd lerna-cli-do npm init -y初始化项目 lerna init --packages="packages/*"lerna create 创建子项目 lerna create core lerna create util...
ROS第十二梯:ros-noetic和Anaconda联合使用
1) 概述 ros-noetic默认Python版本是Python2.7,但在使用过程中,通常需要明确调用python3进行编译。 Anaconda: 支持创建独立的python2/3环境,避免系统库冲突; 方便安装ROS依赖的科学计算库(如Numpy,Pandas)和机器学习框架; 核心目标:在anaconda环…...
网络原理 - 5(TCP - 2 - 三次握手与四次挥手)
目录 3. 连接管理 建立连接 - 三次挥手 三次握手的意义 断开连接 - 四次挥手 握手和挥手的相似和不同之处 连接管理过程中涉及到的 TCP 状态转换 完! 3. 连接管理 连接管理分为建立连接 和 断开连接~(important 重点!) 建…...
【开源】STM32HAL库移植Arduino OneWire库驱动DS18B20和MAX31850
项目开源链接 github主页https://github.com/snqx-lqh本项目github地址https://github.com/snqx-lqh/STM32F103C8T6HalDemo作者 VXQinghua-Li7 📖 欢迎交流 如果开源的代码对你有帮助,希望可以帮我点个赞👍和收藏 项目说明 最近在做一个项目…...