Flink 系列之七 - Data Stream API的源算子原理
之前做过数据平台,对于实时数据采集,使用了Flink。现在想想,在数据开发平台中,Flink的身影几乎无处不在,由于之前是边用边学,总体有点混乱,借此空隙,整理一下Flink的内容,算是一个知识积累,同时也分享给大家。
注意:由于框架不同版本改造会有些使用的不同,因此本次系列中使用基本框架是 Flink-1.19.x,Flink支持多种语言,这里的所有代码都是使用java,JDK版本使用的是19。
代码参考:https://github.com/forever1986/flink-study.git
目录
- 1 Source的底层原理
- 1.1 Source
- 1.2 三个关键组件
- 1.3 源算子的流程
- 2 自定义数据源
- 2.1 新建lesson04子模块
- 2.2 定义检查点及其序列化类
- 2.3 定义三大组件
- 2.4 定义Source和main方法类
- 2.5 运行测试
上一章了解了Flink的源算子,本章将再进入一层,了解一下Flink源算子的扩展,自定义自己的源算子。
1 Source的底层原理
Flink提供了很好的扩展性,支持自定义Source源算子。如果在网上检索Flink自定义源算子,都会搜索到关于如何通过SourceFunction实现自定义源算子,但是你会发现新版本的SourceFunction已经被标注为废弃,这是因为SourceFunction相关的接口功能相对单一,没有考虑当需要对数据源的数据出现各种问题时的处理,因此Flink即将废弃,而是使用底层的Source。下面的示例,将通过实现Source新方式来实现。
不过在实现自定义之前,要先了解Source的底层原理,在《Flink的官方文档的Data Sources》中有对其详细的描述。官方文档讲得很多,可能让人看得有点迷糊。这里使用主要的源码,来剖析Flink中Source的相关内容,并使用一个自定义示例来演示一下。
1.1 Source
Source的接口。它就像一个工厂类,帮助构造SplitEnumerator和SourceReader以及相应的序列化器。简单来说就是给Flink内部调用声明一些方法,用户通过实现这些方法,Flink才能调用到自定义的SplitEnumerator和SourceReader以及相应的序列化器。下面是Source的方法说明:
public interface Source<T, SplitT extends SourceSplit, EnumChkT>extends SourceReaderFactory<T, SplitT> {/*** 设置该源算子是有界流还是无界流*/Boundedness getBoundedness();/*** 创建分片枚举器(SplitEnumerator)*/SplitEnumerator<SplitT, EnumChkT> createEnumerator(SplitEnumeratorContext<SplitT> enumContext)throws Exception;/*** 从某个检查点创建分片枚举器(SplitEnumerator),*/SplitEnumerator<SplitT, EnumChkT> restoreEnumerator(SplitEnumeratorContext<SplitT> enumContext, EnumChkT checkpoint) throws Exception;/*** 创建数据分片(SourceSplit)的序列化类*/SimpleVersionedSerializer<SplitT> getSplitSerializer();/*** 创建检查点的序列化类*/SimpleVersionedSerializer<EnumChkT> getEnumeratorCheckpointSerializer();
}
Source还继承SourceReaderFactory接口,有一个创建源阅读器(SourceReader)的方法
public interface SourceReaderFactory<T, SplitT extends SourceSplit> extends Serializable {/*** 创建源阅读器(SourceReader)*/SourceReader<T, SplitT> createReader(SourceReaderContext readerContext) throws Exception;
}
1.2 三个关键组件
通过从上面的源码,可以得知通过Source需要创建一些实现的组件,而有三个组件比较关键,如下官方图:
首先是官方图里面,描述了一个基本的内容,一个是源算子实现过程中需要实现三个关键组件,他们在整个流程中扮演很重要角色:
- 分片(SourceSplit) :是对一部分 Source 数据的包装,如一个文件或者日志分区。分片是 Source进行任务分配和数据并行读取的基本粒度。
- 源阅读器(SourceReader) :会请求分片并进行处理,例如读取分片所表示的文件或日志分区。SourceReader 在 TaskManagers 上的 SourceOperators 并行运行,并产生并行的事件流/记录流。
- 分片枚举器(SplitEnumerator) :会生成分片并将它们分配给 SourceReader。该组件在 JobManager 上以单并行度运行,负责对未分配的分片进行维护,并以均衡的方式将其分配给 reader。
看完上面的图以及三个组件之后,你可能有点感觉,但是还是了解不深,这里通过一个比喻的方式来描述整个流程:
分片(SourceSplit):把它比作一个打包好的快递
源阅读器(SourceReader):把它比作一个快递员
分片枚举器(SplitEnumerator):把它比作一个快递分类员
整个流程就是:快递分类员不定期的从仓库里取出物品,将物品按照一定规则打包成一个个快递,查看是否有空闲的快递员,如果有,按照一定规则分发快递给快递员。快递员这边负责实际配送,当快递员送完一个快递之后,回来告诉快递分类员,自己已经空闲,等待快递分类员分发新的快递。
这样描述是不是就比较清楚了,现在再来看看三个组件里面的方法的作用,最后通过一个简单流程图描述整个过程:
分片枚举器(SplitEnumerator)类:
public interface SplitEnumerator<SplitT extends SourceSplit, CheckpointT>extends AutoCloseable, CheckpointListener {/*** 启动一个分片枚举器(SplitEnumerator)都会先调用该方法(一般需要启动一个定时器,不断读取数据并分发数据)* 而SplitEnumeratorContext的callAsync方法提供了非常方便创建一个定时器(当然你也可以自己实现)*/void start();/*** 源阅读器(SourceReader)调用SourceReaderContext#sendSplitRequest(),就是告诉分片枚举器(SplitEnumerator)自己空闲,分片枚举器(SplitEnumerator)可以将其加入到空闲列表*/void handleSplitRequest(int subtaskId, @Nullable String requesterHostname);/*** 添加源阅读器(SourceReader)*/void addReader(int subtaskId);/*** 定时的快照方法,也就是保存当前处理的情况,防止程序挂了,可以从检查点开始,而不是从头开始*/CheckpointT snapshotState(long checkpointId) throws Exception;/*** 关闭方法*/@Overridevoid close() throws IOException;/*** 当所有检查点执行完成时调用该方法*/@Overridedefault void notifyCheckpointComplete(long checkpointId) throws Exception {}/*** 处理SourceEvent。这个方法是用于监听SourceEvent事件,可用于与源阅读器(SourceReader)交互。*/default void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {}/*** 将数据分片(SourceSplit)添加回分片枚举器(SplitEnumerator)。只有在源阅读器(SourceReader)发生异常读取失败,且没有被检查点保存下来时,告诉SplitEnumerator分片枚举器,这些分片没有被处理*/void addSplitsBack(List<SplitT> splits, int subtaskId);
}
源阅读器(SourceReader)类:
public interface SourceReader<T, SplitT extends SourceSplit>extends AutoCloseable, CheckpointListener {/** * 启动一个源阅读器(SourceReader)都会先调用该方法*/void start();/*** 源阅读器(SourceReader)做数据处理,并将数据发送给下游*/InputStatus pollNext(ReaderOutput<T> output) throws Exception;/*** 定时的快照方法,也就是保存当前处理的情况,防止程序挂了,可以从检查点开始,而不是从头开始*/List<SplitT> snapshotState(long checkpointId);/*** 判断源阅读器(SourceReader)自己是否已经处理完数据,如果处理完,调用SourceReaderContext#sendSplitRequest()通知分片枚举器(SplitEnumerator)自己空闲*/CompletableFuture<Void> isAvailable();/*** 将数据分片(SourceSplit)添加到该源阅读器(SourceReader),当分片枚举器(SplitEnumerator)调用assignSplit方法时,会将数据分片(SourceSplit)传输到这个方法,源阅读器(SourceReader)接收数据分片(SourceSplit)*/void addSplits(List<SplitT> splits);/*** 当该源阅读器(SourceReader)不想再处理数据分片(SourceSplit)时,调用该方法。*/void notifyNoMoreSplits();/*** 处理SourceEvent。这个方法是用于监听SourceEvent事件,可用于与分片枚举器(SplitEnumerator)交互。*/default void handleSourceEvents(SourceEvent sourceEvent) {}/*** 当所有检查点执行完成时调用该方法*/@Overridedefault void notifyCheckpointComplete(long checkpointId) throws Exception {}}
数据分片(SourceSplit)类:
public interface SourceSplit {/*** 返回数据分片(SourceSplit)的ID*/String splitId();
}
1.3 源算子的流程
除了三大组件之外,还有2个类上下文的类要知道,分别是SplitEnumeratorContext和SourceReaderContext,它们分别是**分片枚举器(SplitEnumerator)和源阅读器(SourceReader)的上下文信息,通过它们来传递两者之间分配数据的信息。这是因为分片枚举器(SplitEnumerator)和源阅读器(SourceReader)**都是独立的进程或者线程,所以需要通过上下文来传递信息,下面是一个基本流程,顺便也说明每个类的方法在具体流程中的作用。(注意:由于是多进程或线程,因此他们之间顺序并不一定是这样子,只不过是按照一个正常逻辑画一个符合逻辑的版本)
这里还有关于检查点的实现没有列出来。其实可以简单理解检查点是另外一个线程处理,每个一段时间,会将目前的处理中间状态数据都保存下来,下次可以从保存的检查点加载,继续处理,无需重头开始。由于检查点还需后面介绍,这里就不多累述。
2 自定义数据源
接下来,使用一个简单示例,来演示如何自定义Source算子。在日常工作中,读取服务器日志并汇总到一个可视化平台供监控和分析是一个非常常见的场景。在这里假设一个场景:
有一个服务器,在固定的目录下面会按照日期不断生成日志,假设每个5分钟生成一个日志文件,由于日志是不断产生,因此常见写日志的方法是正在写的日志都会是以tmp为后缀文件,写完的日志才会被命名为log为后缀。示例需要读取该文件夹下面以log为后缀的日志,并按照文件分配给TaskManager的源阅读器(SourceReader)处理,源阅读器(SourceReader)将其日志的内容按行读取,然后传送到下游算子,读取完之后删除本地log日志,下游算子只是简单打印出来数据。这样子就模拟了一个采集服务器实时日志的过程。(注意,这里只是模拟从本地读取文件,实际业务中可能文件在远程服务器,这个就需要远程访问读取,这里为了方便就不演示远程读取)
解题思路:使用分片枚举器(SplitEnumerator)定时读取文件夹中的文件,分发给源阅读器(SourceReader),源阅读器(SourceReader)负责读取文件里面的数据,并输出给下一个算子。
代码参考lesson04子模块
2.1 新建lesson04子模块
1)在flink-study父项目的pom中引入dependencyManagement。引入hutool和fastjson作为工具类和序列化使用
<dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>${hutool.version}</version>
</dependency>
<dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>${fastjson.version}</version>
</dependency>
2)新建lesson04子模块,其pom引入如下:
<dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><scope>provided</scope></dependency><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId></dependency>
</dependencies>
2.2 定义检查点及其序列化类
1)新建检查点类CustomSplitEnumeratorCheckpoint (这就保存存储已加载的文件和待分配的数据分片(SourceSplit))
import java.util.LinkedList;
import java.util.Set;/*** 检查点,只需要保存已加载文件名称列表和用于保存待分配的数据分片(SourceSplit)*/
public class CustomSplitEnumeratorCheckpoint {/*** 存储已加载的文件*/private Set<String> loadedFiles;/*** 用于保存待分配的数据分片(SourceSplit)--因为还没有分配给源阅读器(SourceReader)处理*/private LinkedList<CustomSourceSplit> sourceSplitList;public CustomSplitEnumeratorCheckpoint(Set<String> loadedFiles, LinkedList<CustomSourceSplit> sourceSplitList) {this.loadedFiles = loadedFiles;this.sourceSplitList = sourceSplitList;}public Set<String> getLoadedFiles() {return loadedFiles;}public void setLoadedFiles(Set<String> loadedFiles) {this.loadedFiles = loadedFiles;}public LinkedList<CustomSourceSplit> getSourceSplitList() {return sourceSplitList;}public void setSourceSplitList(LinkedList<CustomSourceSplit> sourceSplitList) {this.sourceSplitList = sourceSplitList;}
}
2)新建检查点序列化类CustomEnumeratorCheckpointSerializer,这里使用fastjson来进行序列化
import com.alibaba.fastjson.JSON;
import org.apache.flink.core.io.SimpleVersionedSerializer;import java.io.IOException;/*** 检查点的序列化,由于检查点需要保存到本地磁盘,因此需要序列化。这里使用alibaba的fastjson组件做序列化*/
public class CustomEnumeratorCheckpointSerializer implements SimpleVersionedSerializer<CustomSplitEnumeratorCheckpoint> {@Overridepublic int getVersion() {return 0;}@Overridepublic byte[] serialize(CustomSplitEnumeratorCheckpoint obj) throws IOException {return JSON.toJSONBytes(obj);}@Overridepublic CustomSplitEnumeratorCheckpoint deserialize(int version, byte[] serialized) throws IOException {return JSON.parseObject(serialized, CustomSplitEnumeratorCheckpoint.class);}
}
2.3 定义三大组件
1)新建数据分片的类CustomSourceSplit
import org.apache.flink.api.connector.source.SourceSplit;/*** 数据分片(SourceSplit)*/
public class CustomSourceSplit implements SourceSplit {// 由于我们是按照文件分发的规则,因此数据分片(SourceSplit)我们只需要将文件的路径给源阅读器(SourceReader)即可private String path;public CustomSourceSplit() {}public CustomSourceSplit(String path) {this.path = path;}@Overridepublic String splitId() {return path;}public String getPath() {return path;}public void setPath(String path) {this.path = path;}
}
2)新建分片枚举器类CustomSplitEnumerator
/*** 分片枚举器(SplitEnumerator)*/
public class CustomSplitEnumerator implements SplitEnumerator<CustomSourceSplit, CustomSplitEnumeratorCheckpoint> {// 分片枚举器(SplitEnumerator)上下文,用于与源阅读器(SourceReader)通讯private SplitEnumeratorContext<CustomSourceSplit> splitSplitEnumeratorContext;// 定时查询文件夹的时间间隔private long refreshInterval;// 文件路径private File fileDir;// 文件过滤器private LogFileFilter logFileFilter;/*** 用于保存待分配的数据分片(SourceSplit)*/private LinkedList<CustomSourceSplit> sourceSplitList;/*** 用于保存空闲的CustomSourceReader*/private List<Integer> sourceReaderList;/*** 已加载文件名称列表*/private Set<String> loadedFiles;/*** 新建CustomSplitEnumerator*/public CustomSplitEnumerator(SplitEnumeratorContext<CustomSourceSplit> splitSplitEnumeratorContext, long refreshInterval, String path) {this.splitSplitEnumeratorContext = splitSplitEnumeratorContext;this.refreshInterval = refreshInterval;this.fileDir = new File(path);this.logFileFilter = new LogFileFilter();this.sourceSplitList = new LinkedList<>();this.sourceReaderList = new ArrayList<>();this.loadedFiles = new ConcurrentSkipListSet<>();}/*** 从某个检查点新建CustomSplitEnumerator*/public CustomSplitEnumerator(SplitEnumeratorContext<CustomSourceSplit> splitSplitEnumeratorContext, long refreshInterval, String path, LinkedList<CustomSourceSplit> sourceSplitList, Set<String> loadedFiles) {this.splitSplitEnumeratorContext = splitSplitEnumeratorContext;this.refreshInterval = refreshInterval;this.fileDir = new File(path);this.logFileFilter = new LogFileFilter();this.sourceSplitList = sourceSplitList;this.sourceReaderList = new ArrayList<>();this.loadedFiles = loadedFiles;}/*** 启动方法:这里使用SplitEnumeratorContext的callAsync方法启动一个定时任务(我们也可以自己写)*/@Overridepublic void start() {this.splitSplitEnumeratorContext.callAsync(this::loadFiles, // 定时轮询方法this::distributeSplit, // 如果读取到数据,则使用该方法进行分片数据refreshInterval, // 启动多少秒之后开始读取文件夹refreshInterval); // 间隔多少秒读取一次文件夹}/*** 定时扫描指定路径,如果发现新文件,返回数据*/private List<CustomSourceSplit> loadFiles() {File[] files = fileDir.listFiles(logFileFilter);if (files==null|| files.length == 0) {return new ArrayList<>();}//将加载出的文件放入loadedFiles并包装为分片(FileSourceSplit)返回List<CustomSourceSplit> result = new ArrayList<>();for(File file : files){// 加入到列表中,防止重复if(loadedFiles.add(file.getAbsolutePath())){result.add(new CustomSourceSplit(file.getAbsolutePath()));}}return result;}/*** 如果读取到数据,则使用该方法进行分片数据,其中assignSplit方法就是分配给源阅读器(SourceReader)*/private void distributeSplit(List<CustomSourceSplit> list, Throwable error) {sourceSplitList.addAll(list);if (sourceSplitList.isEmpty()) {return;}// 看看空闲的sourceReaderList列表有没有空闲的源阅读器(SourceReader)Iterator<Integer> iterator = sourceReaderList.iterator();while (iterator.hasNext()) {Integer next = iterator.next();// 拿出一个数据分片CustomSourceSplit poll = sourceSplitList.poll();if (poll == null) {break;}// 设置给源阅读器(SourceReader)splitSplitEnumeratorContext.assignSplit(poll, next);iterator.remove();}}/*** 源阅读器(SourceReader)调用SourceReaderContext#sendSplitRequest(),* 就是告诉分片枚举器(SplitEnumerator)自己空闲,分片枚举器(SplitEnumerator)可以将其加入到空闲列表*/@Overridepublic void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) {sourceReaderList.add(subtaskId);}/*** 将数据分片(SourceSplit)添加回分片枚举器(SplitEnumerator)。* 只有在源阅读器(SourceReader)发生异常读取失败,且没有被检查点保存下来时,告诉SplitEnumerator分片枚举器,这些分片没有被处理*/@Overridepublic void addSplitsBack(List splits, int subtaskId) {sourceSplitList.addAll(splits);}/*** 添加源阅读器(SourceReader),这里在我们本次演示无需做任何内容*/@Overridepublic void addReader(int subtaskId) {}/*** 定时的快照方法,也就是保存当前处理的情况,防止程序挂了,可以从检查点开始,而不是从头开始*/@Overridepublic CustomSplitEnumeratorCheckpoint snapshotState(long checkpointId) throws Exception {return new CustomSplitEnumeratorCheckpoint(loadedFiles, sourceSplitList);}/*** 关闭方法,这里在我们本次演示无需做任何内容*/@Overridepublic void close() throws IOException {}/*** 处理SourceEvent。这个方法是用于监听SourceEvent事件,可用于与源阅读器(SourceReader)交互* 这里监听来自源阅读器(SourceReader)的CustomSourceEvent事件,删除loadedFiles中已经完成读取的文件,防止loadedFiles过大* 这个方法会将sourceReader已完成读取并已通过检查点的文件删除。*/@Overridepublic void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {if (sourceEvent instanceof CustomSourceEvent) {Set<String> fileSplits = ((CustomSourceEvent) sourceEvent).getSplits();loadedFiles.removeAll(fileSplits);}}/*** 过滤文件名,只读取log结尾的文件名且是文件的*/public static class LogFileFilter implements FileFilter {private String suffixName = ".log";public LogFileFilter() {}public LogFileFilter(String suffixName) {this.suffixName = suffixName;}@Overridepublic boolean accept(File pathname) {return pathname.isFile() && pathname.getName().endsWith(suffixName);}}
}
3)新建源阅读器类CustomSourceReader
import cn.hutool.core.io.FileUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.core.io.InputStatus;import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.stream.Collectors;/*** 源阅读器(SourceReader)*/
public class CustomSourceReader implements SourceReader<String, CustomSourceSplit> {/*** 源阅读器(SourceReader)的上下文,用于跟分片枚举器(SplitEnumerator)通讯使用*/private final SourceReaderContext sourceReaderContext;/*** 用于存放从文件中读取的数据的,由于是按行读取,因此每一行存储一个String*/private Queue<String> waitQueue = new ConcurrentLinkedDeque<>();/*** 用于存放从分片枚举器(SplitEnumerator)接收到的分片*/private List<CustomSourceSplit> allSplit = new ArrayList<>();/*** 用于存储已读取完成的分片,可用于删除*/private List<CustomSourceSplit> finishSplit;/*** 用于标记数据是否读取完成*/private CompletableFuture<Void> completableFuture;public CustomSourceReader(SourceReaderContext sourceReaderContext) {this.sourceReaderContext = sourceReaderContext;this.completableFuture = new CompletableFuture<>();this.finishSplit = new ArrayList<>();}/*** 源阅读器(SourceReader)的启动,这里在我们本次演示无需做任何内容*/@Overridepublic void start() {}/*** 源阅读器(SourceReader)数据处理函数,这里从waitQueue中读到数据,并发送到下游算子,不做任何处理*/@Overridepublic InputStatus pollNext(ReaderOutput<String> output) throws Exception {if (!waitQueue.isEmpty()) {String poll = waitQueue.poll();if (StringUtils.isNotEmpty(poll)) {output.collect(poll);}/*** MORE_AVAILABLE - SourceReader 有可用的记录。* NOTHING_AVAILABLE - SourceReader 现在没有可用的记录,但是将来可能会有记录可用。* END_OF_INPUT - SourceReader 已经处理完所有记录,到达数据的尾部。这意味着 SourceReader 可以终止任务了。*/return InputStatus.MORE_AVAILABLE;}return InputStatus.NOTHING_AVAILABLE;}/*** 保存快照方法*/@Overridepublic List<CustomSourceSplit> snapshotState(long checkpointId) {if (allSplit.isEmpty()) {return new ArrayList<>();}synchronized (allSplit) {finishSplit.addAll(allSplit);allSplit.clear();}return finishSplit;}/*** 是否已经处理完数据,如果处理完,调用SourceReaderContext#sendSplitRequest()通知分片枚举器(SplitEnumerator)自己空闲可以分配新数据*/@Overridepublic CompletableFuture<Void> isAvailable() {if (this.completableFuture.isDone()) {this.completableFuture = new CompletableFuture<>();}//分片枚举器(SplitEnumerator)this.sourceReaderContext.sendSplitRequest();return this.completableFuture;}/*** 拿到分片枚举器(SplitEnumerator)分给自己的数据分片(SourceSplit)* 按行读取文件并将读取每行数据放到该源阅读器(SourceReader)的waitQueue,供pollNext处理数据*/@Overridepublic void addSplits(List<CustomSourceSplit> splits) {for (CustomSourceSplit split : splits) {//按行读取数据List<String> lines = FileUtil.readLines(split.getPath(), StandardCharsets.UTF_8);waitQueue.addAll(lines);}//设置数据已经读取完成this.completableFuture.complete(null);//将分片加入到allSplitsynchronized (allSplit) {allSplit.addAll(splits);}}/*** 当该源阅读器(SourceReader)不想再处理数据分片(SourceSplit)时,调用该方法。这里在我们本次演示无需做任何内容*/@Overridepublic void notifyNoMoreSplits() {}/*** 当该源阅读器(SourceReader)关闭时,调用该方法。这里在我们本次演示无需做任何内容*/@Overridepublic void close() throws Exception {}/*** 当所有检查点执行完成时调用该方法,这里我们清理掉已经读取的日志文件*/@Overridepublic void notifyCheckpointComplete(long checkpointId) throws Exception {if (finishSplit.isEmpty()) {return;}//将已完成检查点的数据删除Set<String> deleteFile = finishSplit.stream().map(it -> {FileUtil.del(it.getPath());return it.getPath();}).collect(Collectors.toSet());finishSplit.clear();//将已删除的文件通过事件通知分片枚举器(SplitEnumerator),分片枚举器(SplitEnumerator)可以删除loadedFile列表this.sourceReaderContext.sendSourceEventToCoordinator(new CustomSourceEvent(deleteFile));}
}
4)新建事件类CustomSourceEvent,用于通知使用:
import org.apache.flink.api.connector.source.SourceEvent;import java.util.Set;/*** SourceEvent事件,用于源阅读器(SourceReader)删除文件后通知分片枚举器(SplitEnumerator)*/
public class CustomSourceEvent implements SourceEvent {private Set<String> splits;public CustomSourceEvent() {}public CustomSourceEvent(Set<String> splits) {this.splits = splits;}public Set<String> getSplits() {return splits;}public void setSplits(Set<String> splits) {this.splits = splits;}
}
5)新建数据分片的序列化类CustomSplitSerializer
/*** 数据分片的序列化,由于分片枚举器(SplitEnumerator)和源阅读器(SourceReader)可能在不同服务器,因此数据分片需要序列化传输。这里使用alibaba的fastjson组件做序列化*/
public class CustomSplitSerializer implements SimpleVersionedSerializer<CustomSourceSplit> {@Overridepublic int getVersion() {return 0;}@Overridepublic byte[] serialize(CustomSourceSplit obj) throws IOException {return JSON.toJSONBytes(obj);}@Overridepublic CustomSourceSplit deserialize(int version, byte[] serialized) throws IOException {return JSON.parseObject(serialized, CustomSourceSplit.class);}
}
2.4 定义Source和main方法类
1)新建源算子类CustomSource
/*** 自定义的Source*/
public class CustomSource implements Source<String, CustomSourceSplit, CustomSplitEnumeratorCheckpoint> {// 文件路径private final String dir;// 定时查询文件夹的时间间隔private final long refreshInterval;public CustomSource(String dir, long refreshInterval) {this.dir = dir;this.refreshInterval = refreshInterval;}/*** 设置该源算子是有界流还是无界流*/@Overridepublic Boundedness getBoundedness() {return Boundedness.CONTINUOUS_UNBOUNDED;}/*** 创建分片枚举器(SplitEnumerator)*/@Overridepublic SplitEnumerator<CustomSourceSplit, CustomSplitEnumeratorCheckpoint> createEnumerator(SplitEnumeratorContext<CustomSourceSplit> enumContext) throws Exception {return new CustomSplitEnumerator(enumContext, refreshInterval, dir);}/*** 从某个检查点创建分片枚举器(SplitEnumerator),*/@Overridepublic SplitEnumerator<CustomSourceSplit, CustomSplitEnumeratorCheckpoint> restoreEnumerator(SplitEnumeratorContext<CustomSourceSplit> enumContext, CustomSplitEnumeratorCheckpoint checkpoint) throws Exception {return new CustomSplitEnumerator(enumContext, refreshInterval, dir, checkpoint.getSourceSplitList(), checkpoint.getLoadedFiles());}/*** 创建数据分片(SourceSplit)的序列化类*/@Overridepublic SimpleVersionedSerializer<CustomSourceSplit> getSplitSerializer() {return new CustomSplitSerializer();}/*** 创建检查点的序列化类*/@Overridepublic SimpleVersionedSerializer<CustomSplitEnumeratorCheckpoint> getEnumeratorCheckpointSerializer() {return new CustomEnumeratorCheckpointSerializer();}/*** 创建源阅读器(SourceReader)*/@Overridepublic SourceReader<String, CustomSourceSplit> createReader(SourceReaderContext readerContext) throws Exception {return new CustomSourceReader(readerContext);}
}
2)新建Flink的执行类CustomSourceDemo
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/*** Flink的执行类*/
public class CustomSourceDemo {public static void main(String[] args) throws Exception {// 1. 创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();CheckpointConfig checkpointConfig = env.getCheckpointConfig();checkpointConfig.setCheckpointInterval(60000); // 配置每60秒自动保存一个检查点// 2. 读取数据 -- 使用我们自定义的数据源。读取E:\mylogs目录,并且设置每隔5秒读取一次目录里面的文件列表CustomSource customSource = new CustomSource("E:\\mylogs", 10000);DataStreamSource<String> dataStreamSource = env.fromSource(customSource, WatermarkStrategy.noWatermarks(), "custom");dataStreamSource.setParallelism(1); // 这里配置1个并行度,只是为了方便debug// 3. 输出dataStreamSource.print();// 执行env.execute();}}
2.5 运行测试
1)在E:\mylogs目录下建立一些以log结尾的文件,并输入内容。还可以建立非log结尾的目录以测试是否会被读取。
2)运行CustomSourceDemo:
上面的示例基本实现了前面定下来的需求,但是你会看到整个过程还是挺复杂,而示例其实还是比较简单的处理方式,中间很多细节还需打磨。Flink也提供SourceReaderBase等更高层的实现类,封装解决底层的一些常见问题,所以大家不必从最底层的SourceReader开始。
结语:通过本章对Data Stream API的源算子底层原理做了一个讲解,并通过一个实际案例自定义源算子。接下来还继续讲解Data Stream API的其它相关内容。
相关文章:
Flink 系列之七 - Data Stream API的源算子原理
之前做过数据平台,对于实时数据采集,使用了Flink。现在想想,在数据开发平台中,Flink的身影几乎无处不在,由于之前是边用边学,总体有点混乱,借此空隙,整理一下Flink的内容,…...
使用 SSE + WebFlux 推送日志信息到前端
为什么使用 SSE 而不使用 WebSocket, 请看 SEE 对比 Websocket 的优缺点。 特性SSEWebSocket通信方向单向(服务器→客户端)双向(全双工)协议基于 HTTP独立协议(需 ws:// 前缀)兼容性现代浏览器(…...
Java多线程同步有哪些方法?
大家好,我是锋哥。今天分享关于【Java多线程同步有哪些方法?】面试题。希望对大家有帮助; Java多线程同步有哪些方法? 1000道 互联网大厂Java工程师 精选面试题-Java资源分享网 在 Java 中,多线程同步是确保多个线程在访问共享资源时不会…...
Java—数 组
数组就是一个容器,用来存一批同种类型的数据。 一、静态初始化数组 1.1 定义方式 语法: 完整格式:数据类型 [ ] 数组名 new 数据类型 []{ 元素 1 ,元素 2 ,元素3… };简化格式:数据类型 [ ] 数组名 {…...
iOS/Android 使用 C++ 跨平台模块时的内存与生命周期管理
在移动应用开发领域,跨平台开发已经成为一种不可忽视的趋势。随着智能手机市场的持续扩张,开发者需要同时满足iOS和Android两大主流平台的需求,而这往往意味着重复的工作量和高昂的维护成本。跨平台开发的目标在于通过一套代码库实现多平台的支持,从而降低开发成本、加速产…...
为什么vue的key值,不用index?
在 Vue 中,key 的作用是帮助框架高效地识别和复用 DOM 节点或组件实例。使用数组索引 (index) 作为 key 值可能会导致以下问题,因此通常不建议这样做: 1. 列表数据变化时,可能导致错误的 DOM 复用 问题:当列表的顺序…...
Hi3516CV608 超高清智慧视觉 SoC 芯片 可提供开发资料
Hi3516CV608 超高清智慧视觉SoC 产品简介 总体介绍 Hi3516CV608是一颗面向消费类市场的IPC SoC,在新一代视频编解码标准、网络安全、隐私保护和人工智能方面引领行业发展。主要应用于室内外场景下的云台机、枪机、球机、枪球一体机、双目长短焦机等产品形态&#…...
Flink部署与应用——部署方式介绍
引入 我们通过Flink相关论文的介绍,对于Flink已经有了初步理解,这里简单的梳理一下Flink常见的部署方式。 Flink 的部署方式 StandAlone模式 介绍 StandAlone模式是Flink框架自带的分布式部署模式,不依赖其他的资源调度框架,…...
数据挖掘技术与应用课程论文——数据挖掘中的聚类分析方法及其应用研究
数据挖掘中的聚类分析方法及其应用研究 摘要 聚类分析是数据挖掘技术中的一个重要组成部分,它通过将数据集中的对象划分为多个组或簇,使得同一簇内的对象具有较高的相似性,而不同簇之间的对象具有较低的相似性。 本文系统地研究了数据挖掘中的多种聚类分析方法及其应用。首先…...
SIEMENS PLC程序解读 ST 语言 车型识别
1、ST程序代码 IF #Type1_MIX < #CFG_Type.Type.CT AND #CFG_Type.Type.CT < #Type1_MAX AND #CFG_Type.Type.CT<>0 THEN#Type[1] : 1;FOR #I : 0 TO 39 DOIF #CFG_Type.Type.CT/10 (#Type1_MIX 10 * #I)/10 THEN#Sub_Type."1"[#I 1] : 1;END_IF; E…...
神经网络基础[损失函数,bp算法,梯度下降算法 ]
关于神经网络的基础的概念可以看我前面的文章 损失函数 在深度学习中, 损失函数是用来衡量模型参数的质量的函数, 衡量的方式是比较网络输出和真实输出的差异 作用:指导模型的训练过程,通过反向传播算法计算梯度,从而更新网络的参数,最终使…...
python打印颜色(python颜色、python print颜色、python打印彩色文字、python print彩色、python彩色文字)
文章目录 python怎么打印彩色文字1. 使用ANSI转义码:2. 使用colorama库(更好的跨平台支持):3. 使用termcolor库: python怎么打印彩色文字 在Python中打印彩色文字有几种方法: 1. 使用ANSI转义码ÿ…...
数字域残留频偏的补偿原理
模拟域的频谱搬移一般通过混频器实现。一般情况下模拟域调整完频偏后数字域还会存在一部分残留频偏这部分就需要在数字域补偿。原理比较简单本文进行下粗略总结。首先我们需要了解下采样具体可参考下信号与系统笔记(六):采样 - 知乎。 采样前和采样后,角…...
Linux文件管理2
Linux 文件管理是系统操作的核心内容之一,涉及文件和目录的创建、删除、移动、查看、权限管理等操作。以下是 Linux 文件管理的核心知识点和常用操作总结: 一、文件系统结构 Linux 文件系统采用 树形结构,以 /(根目录࿰…...
C++----模拟实现string
模拟实现string,首先我们要知道成员变量有哪些: class _string{private:char* _str;size_t capacity;//空间有多大size_t size;//有效字符多少const static size_t npos;};const size_t _string::npos-1;//static在外面定义不需要带static,np…...
Python torch.optim.lr_scheduler 常用学习率调度器使用方法
在看学习率调度器之前,我们先看一下学习率的相关知识: 学习率 学习率的定义 学习率(Learning Rate)是深度学习中一个关键的超参数,它决定了在优化算法(如梯度下降法)更新模型参数时࿰…...
从零开始学Python游戏编程39-碰撞处理1
在《从零开始学Python游戏编程38-精灵5》代码的基础上,添加两个敌人的防御塔,玩家的坦克无法移动到防御塔所在的空格中,如图1所示。 图1 游戏中的碰撞处理 1 游戏中空格的坐标 在《从零开始学Python游戏编程36-精灵3》中提到,可…...
同步定时器的用户数要和线程组保持一致,否则jmeter会出现接口不执行’stop‘和‘×’的情况
调试压测时发现了一个问题就是线程计划总是出现‘stop’的按钮无法执行完毕 发现时同步定时器导致的,就是有接口使用了同步定时器,但是这个同步定时器的用户数量设置的<线程组用户数量时,会出现执行无法结束的情况,如下…...
如何在Linux用libevent写一个聊天服务器
废话少说,先看看思路 因为libevent的回调机制,我们可以借助这个机制来创建bufferevent来实现用户和用户进行通信 如果成功连接后我们可以直接在listener回调函数里创建一个bufferevent缓冲区,并为每个缓冲区设置相应的读回调和事件回调&…...
Virtuoso ADE采用Spectre仿真中出现MOS管最小长宽比满足要求依然报错的情况解决方法
在ADE仿真中错误问题如下: ERROR (CMI-2440): "xxx.scs" 46338: I2.M1: The length, width, or area of the instance does not fit the given lmax-lmin, wmax-wmin, or areamax-areamin range for any model in the I2.M3.nch_hvt group. The channel w…...
防火墙原理与应用总结
防火墙介绍: 防火墙(Firewall)是一种网络安全设备,其核心目标是通过分析数据包的源地址、端口、协议等内容,保护一个网络区域免受来自另一个网络区域的网络攻击和网络入侵行为,同时允许合法流量自由通行。…...
Graph Database Self-Managed Neo4j 知识图谱存储实践2:通过官方新手例子入门(未完成)
官方入门例子:neo4j-graph-examples/get-started: An introduction to graph databases and Neo4j for new users 官方例子仓库:https://github.com/neo4j-graph-examples 下载数据 git clone https://github.com/neo4j-graph-examples/get-started …...
GIT下载步骤
git官方链接: 添加链接描述...
C++中的vector和list的区别与适用场景
区别 特性vectorlist底层实现动态数组双向链表内存分配连续内存块非连续内存块随机访问支持,通过索引访问,时间复杂度O(1)不支持,需遍历,时间复杂度O(n)插入/删除末尾操作效率高,时间复杂度O(1)任意位置操作效率高&am…...
软件测试入门学习笔记
今天学习新知识,软件测试。 什么是软件测试? 使用人工和自动手段来运行或测试某个系统的过程,目的在于检验它是否满足规定的需求或弄清实际结果与预期结果之间的差别。 软件测试的目的? 1)为了发现程序࿰…...
2025年深度学习模型发展全景透视(基于前沿技术突破与开源生态演进的交叉分析)
2025年深度学习模型发展全景透视 (基于前沿技术突破与开源生态演进的交叉分析) 一、技术突破与能力边界拓展 智能水平跃升 2025年开源模型如Meta Llama-4、阿里Qwen2.5-VL参数规模突破1300亿,在常识推理能力测试中首次超越人类基准线7.2%谷歌…...
时间复杂度分析
复杂度分析的必要性: 当给我们一段代码时,我们是以什么准则来判断代码效率的高低呢?每一段代码都会消耗一段时间,或占据一段数据空间,那么自然是在实现相同功能的情况下,代码所耗时间最少,所占…...
BGE-m3 和 BCE-Embedding 模型对比分析
以下是对 BGE-m3 和 BCE-Embedding 模型在 embedding 领域的多维度对比分析,基于公开的技术文档和实验数据: 1. 基础信息对比 维度BGE-m3 (智源研究院)BCE-Embedding (网易)发布时间2024 年 1 月2023 年 9 月模型架构Transformer-basedTransformer-base…...
题目 3320: 蓝桥杯2025年第十六届省赛真题-产值调整
题目 3320: 蓝桥杯2025年第十六届省赛真题-产值调整 时间限制: 2s 内存限制: 192MB 提交: 549 解决: 122 题目描述 偏远的小镇上,三兄弟共同经营着一家小型矿业公司 “兄弟矿业”。公司旗下有三座矿山:金矿、银矿和铜矿,它们的初始产值分别用…...
计算机组成原理第二章 数据的表示和运算——2.1数制与编码
计算机组成原理第二章 数据的表示和运算——数制与编码 一、基本概念与核心知识点 1.1 数制系统基础 1.1.1 进位计数制 定义:以固定基数(如2、8、10、16)表示数值的系统核心要素: 基数(R):允…...
基于归纳共形预测的大型视觉-语言模型中预测集的**数据驱动校准**
摘要 本研究通过分离共形预测(SCP)框架,解决了大型视觉语言模型(LVLMs)在视觉问答(VQA)任务中幻觉缓解的关键挑战。虽然LVLMs在多模态推理方面表现出色,但它们的输出常常表现出具有…...
Golang | 自行实现并发安全的Map
核心思路,读写map之前加锁!哈希思路,大map化分为很多个小map...
【Python数据库编程实战】从SQL到ORM的完整指南
目录 前言技术背景与价值当前技术痛点解决方案概述目标读者说明 一、技术原理剖析核心概念图解核心作用讲解关键技术模块说明技术选型对比 二、实战演示环境配置要求核心代码实现案例1:SQLite基础操作案例2:MySQL连接池案例3:SQLAlchemy ORM …...
深入剖析扣子智能体的工作流与实战案例
前面我们已经初步带大家体验过扣子工作流,工作流程是 Coze 最为强大的功能之一,它如同扣子中蕴含的奇妙魔法工具,赋予我们的机器人处理极其复杂问题逻辑的能力。 这篇文章会带你更加深入地去理解并运用工作流解决实际问题 目录 一、工作流…...
【计算机网络】IP地址
IPv4 五类地址 1.0.0.0 ~ 126.255.255.255A类子网8位,主机24位128.0.0.0 ~ 191.255.255.255B类子网16位,主机16位192.0.0.0 ~ 223.255.255.255C类子网24位,主机8位224.0.0.0 ~ 239.255.255.255D类不分网络地址和主机地址,作为组播…...
基于CATIA参数化管道建模的自动化插件开发实践——NX建模之管道命令的参考与移植
引言 在机械设计领域,CATIA作为行业领先的CAD软件,其强大的参数化建模能力备受青睐。本文介绍如何利用Python的PySide6框架与CATIA二次开发技术,开发一款智能管状体生成工具。该工具借鉴了同类工业软件NX的建模的管道命令,通过Py…...
运维之SSD硬盘(SSD hard Drive for Operation and Maintenance)
背景 SSD的产生背景是计算技术发展和市场需求驱动的结果。早期计算机使用磁芯存储器,后来被半导体存储器取代,提高了速度和可靠性。随着电子设备小型化,对轻便、低功耗存储器的需求增长,SSD因无机械部件、速度快、耗电少而受到关…...
基于javaweb的SSM+Maven红酒朔源管理系统设计与实现(源码+文档+部署讲解)
技术范围:SpringBoot、Vue、SSM、HLMT、Jsp、PHP、Nodejs、Python、爬虫、数据可视化、小程序、安卓app、大数据、物联网、机器学习等设计与开发。 主要内容:免费功能设计、开题报告、任务书、中期检查PPT、系统功能实现、代码编写、论文编写和辅导、论文…...
HTML 地理定位(Geolocation)教程
HTML 地理定位(Geolocation)教程 简介 HTML5 的 Geolocation API 允许网页应用获取用户的地理位置信息。这个功能可用于提供基于位置的服务,如导航、本地搜索、天气预报等。本教程将详细介绍如何在网页中实现地理定位功能。 工作原理 浏览器可以通过多种方式确定…...
RHEL与CentOS:从同源到分流的开源操作系统演进
RHEL与CentOS:从同源到分流的开源操作系统演进 一、核心关系:源代码的重构与社区化 RHEL(Red Hat Enterprise Linux)与CentOS(Community ENTerprise Operating System)的关系可以概括为“同源异构”。RHE…...
架构师面试(三十六):广播消息
题目 在像 IM、短视频、游戏等实时在线类的业务系统中,一般会有【广播消息】业务,这类业务具有瞬时高流量的特点。 在对【广播消息】业务实现时通常需要同时写 “系统消息库” 和更新用户的 “联系人库” 的操作,用户的联系人表中会有未读数…...
Spine 动画教程:皮肤制作
一、前言 搁了很久的抖音直播小玩法开发,最近又让我想起来了。由于是初次尝试,所以我将开发费用的预算降到为零。不但不买服务器采用 UnitySDK 的指令直推,而且游戏的资产也用 AI 生成,主打省时又省钱。 但是图片有了࿰…...
Rust 学习笔记:函数和控制流
Rust 学习笔记:函数和控制流 Rust 学习笔记:函数和控制流函数(Function)语句和表达式带返回值的函数注释控制流if 表达式使用 else if 处理多个条件在 let 语句中使用 if循环loop从循环中返回值循环标签消除多个循环之间的歧义带 …...
探秘LLM推理模型:hidden states中藏着的self verification的“钥匙”
推理模型在数学和逻辑推理等任务中表现出色,但常出现过度推理的情况。本文研究发现,推理模型的隐藏状态编码了答案正确性信息,利用这一信息可提升推理效率。想知道具体如何实现吗?快来一起来了解吧! 论文标题 Reasoni…...
《Learning Langchain》阅读笔记8-RAG(4)在vector store中存储embbdings
什么是 vector store? 与专门用于存储结构化数据(如 JSON 文档或符合关系型数据库模式的数据)的传统数据库不同,vector stores处理的是非结构化数据,包括文本和图像。像传统数据库一样,vector stores也能执…...
【C/C++】深入理解指针(五)
文章目录 深入理解指针(五)1.回调函数是什么?2.qsort使用举例2.1 使用qsort函数排序整型数据强调 2.2 使用qsort排序结构数据 3.qsort函数的模拟实现 深入理解指针(五) 1.回调函数是什么? 回调函数就是⼀个通过函数指针调⽤的函数。 如果你把函数的指…...
【vue】【element-plus】 el-date-picker使用cell-class-name进行标记,type=year不生效解决方法
typedete,自定义cell-class-name打标记效果如下: 相关代码: <el-date-pickerv-model"date":clearable"false":editable"false":cell-class-name"cellClassName"type"date"format&quo…...
RocketMQ 主题与队列的协同作用解析(既然队列存储在不同的集群中,那要主题有什么用呢?)---管理命令、配置安装
学习之前呢需要会使用linux的基础命令 一.RocketMQ 主题与队列的协同作用解析 在 RocketMQ 中,主题(Topic)与队列(Queue)的协同设计实现了消息系统的逻辑抽象与物理存储分离。虽然队列实际存储在不同集群的 B…...
解决视频处理中的 HEVC 解码错误:Could not find ref with POC xxx【已解决】
问题描述 今天在使用 Python 处理视频时遇到了以下错误: [hevc 0x7f8a1d02b7c0] Could not find ref with POC 33之前没接触过视频处理,查了一下,这个错误通常发生在处理 HEVC(H.265)编码 的视频时,原因…...
NEGATIVE LABEL GUIDED OOD DETECTION WITH PRETRAINED VISION-LANGUAGE MODELS
1. 介绍: 这篇论文也是基于CLIP通过后处理的方法实现的OOD的检测,但是设计点在于,之前的方法是使用的ID的类别,这篇工作是通过添加一些在语义上非常不同于ID的类别的外分布类来做的OOD检测。 CLIP做OOD检测的这个系列里面我看的以及记录的第一篇就是MCM的方法,这也是确实是…...