Flink--API 之Transformation-转换算子的使用解析
目录
一、常用转换算子详解
(一)map 算子
(二)flatMap 算子
(三)filter 算子
(四)keyBy 算子
元组类型
POJO
(五)reduce 算子
二、合并与连接操作
(一)union 算子
(二)connect 算子
三、侧输出流(Side Outputs)
四、总结
在大数据处理领域,Apache Flink 凭借其强大的流处理和批处理能力备受青睐。而转换算子作为 Flink 编程模型中的关键部分,能够对数据进行灵活多样的处理操作,满足各种复杂业务场景需求。本文将深入介绍 Flink 中常见的转换算子,包括 map、flatMap、filter、keyBy、reduce 等,并结合详细代码示例讲解其使用方法,同时探讨 union、connect 等合并连接操作以及侧输出流等特性,帮助读者全面掌握 Flink 转换算子的精髓。
一、常用转换算子详解
(一)map 算子
功能概述
map 算子主要用于对输入流中的每个元素进行一对一的转换操作,基于用户自定义的映射逻辑将输入元素转换为新的输出元素。
代码示例
假设我们有一份访问日志数据,格式如下:
86.149.9.216 10001 17/05/2015:10:05:30 GET /presentations/logstash-monitorama-2013/images/github-contributions.png
83.149.9.216 10002 17/05/2015:10:06:53 GET /presentations/logstash-monitorama-2013/css/print/paper.css
83.149.9.216 10002 17/05/2015:10:06:53 GET /presentations/logstash-monitorama-2013/css/print/paper.css
83.149.9.216 10002 17/05/2015:10:06:53 GET /presentations/logstash-monitorama-2013/css/print/paper.css
83.149.9.216 10002 17/05/2015:10:06:53 GET /presentations/logstash-monitorama-2013/css/print/paper.css
83.149.9.216 10002 17/05/2015:10:06:53 GET /presentations/logstash-monitorama-2013/css/print/paper.css
83.149.9.216 10002 17/05/2015:10:06:53 GET /presentations/logstash-monitorama-2013/css/print/paper.css
10.0.0.1 10003 17/05/2015:10:06:53 POST /presentations/logstash-monitorama-2013/css/print/paper.css
10.0.0.1 10003 17/05/2015:10:07:53 POST /presentations/logstash-monitorama-2013/css/print/paper.css
10.0.0.1 10003 17/05/2015:10:08:53 POST /presentations/logstash-monitorama-2013/css/print/paper.css
10.0.0.1 10003 17/05/2015:10:09:53 POST /presentations/logstash-monitorama-2013/css/print/paper.css
10.0.0.1 10003 17/05/2015:10:10:53 POST /presentations/logstash-monitorama-2013/css/print/paper.css
10.0.0.1 10003 17/05/2015:10:16:53 POST /presentations/logstash-monitorama-2013/css/print/paper.css
10.0.0.1 10003 17/05/2015:10:26:53 POST /presentations/logstash-monitorama-2013/css/print/paper.css
我们要将其转换为一个 LogBean
对象,包含访问 ip
、用户 userId
、访问时间戳 timestamp
、访问方法 method
、访问路径 path
等字段。
假如需要用到日期工具类,可以导入lang3包:
<dependency><groupId>org.apache.commons</groupId><artifactId>commons-lang3</artifactId><version>3.12.0</version>
</dependency>
以下是代码实现:
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.commons.lang3.time.DateUtils;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.text.SimpleDateFormat;
import java.util.Date;public class MapDemo {@Data@AllArgsConstructor@NoArgsConstructorstatic class LogBean{String ip; // 访问ipint userId; // 用户idlong timestamp; // 访问时间戳String method; // 访问方法String path; // 访问路径}public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2. source-加载数据DataStream<String> fileStream = env.readTextFile("F:\\BD230801\\FlinkDemo\\datas\\a.log");//3. transformation-数据处理转换// 此处也可以将数据放入到tuple中,tuple可以支持到tuple25DataStream<LogBean> mapStream = fileStream.map(new MapFunction<String, LogBean>() {@Overridepublic LogBean map(String line) throws Exception {String[] arr = line.split(" ");String ip = arr[0];int userId = Integer.valueOf(arr[1]);String createTime = arr[2];// 如何将一个时间字符串变为时间戳// 17/05/2015:10:05:30/*SimpleDateFormat simpleDateFormat = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss");Date date = simpleDateFormat.parse(createTime);long timeStamp = date.getTime();*/// 要想使用这个common.lang3 下的工具类,需要导入包Date date = DateUtils.parseDate(createTime, "dd/MM/yyyy:HH:mm:ss");long timeStamp = date.getTime();String method = arr[3];String path = arr[4];LogBean logBean = new LogBean(ip, userId, timeStamp, method, path);return logBean;}});//4. sink-数据输出mapStream.print();//5. execute-执行env.execute();}
}
在上述代码中,通过 map
函数的自定义逻辑,将每行日志字符串按空格拆分后,进行相应字段提取与时间戳转换,最终封装成 LogBean
对象输出。
第二个版本:
import lombok.AllArgsConstructor;
import lombok.Data;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.text.SimpleDateFormat;
import java.util.Date;@Data
@AllArgsConstructor
class LogBean{private String ip; // 访问ipprivate int userId; // 用户idprivate long timestamp; // 访问时间戳private String method; // 访问方法private String path; // 访问路径
}
public class Demo04 {// 将数据转换为javaBeanpublic static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2. source-加载数据DataStreamSource<String> streamSource = env.readTextFile("datas/a.log");//3. transformation-数据处理转换SingleOutputStreamOperator<LogBean> map = streamSource.map(new MapFunction<String, LogBean>() {@Overridepublic LogBean map(String line) throws Exception {String[] arr = line.split("\\s+");//时间戳转换 17/05/2015:10:06:53String time = arr[2];SimpleDateFormat format = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss");Date date = format.parse(time);long timeStamp = date.getTime();return new LogBean(arr[0],Integer.parseInt(arr[1]),timeStamp,arr[3],arr[4]);}});//4. sink-数据输出map.print();//5. execute-执行env.execute();}
}
(二)flatMap 算子
功能概述
flatMap 算子可以将输入流中的每个元素转换为零个、一个或多个输出元素。它适用于需要对输入元素进行展开、拆分等操作的场景。
代码示例
假设有数据格式为“张三,苹果手机,联想电脑,华为平板”这样的文本文件,我们要将其转换为“张三有苹果手机”“张三有联想电脑”“张三有华为平板”等形式。
flatmap.log文件如:
张三,苹果手机,联想电脑,华为平板
李四,华为手机,苹果电脑,小米平板
代码如下:
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class FlatMapDemo {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2. source-加载数据//2. source-加载数据DataStream<String> fileStream = env.readTextFile("F:\\BD230801\\FlinkDemo\\datas\\flatmap.log");//3. transformation-数据处理转换DataStream<String> flatMapStream = fileStream.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String line, Collector<String> collector) throws Exception {//张三,苹果手机,联想电脑,华为平板String[] arr = line.split(",");String name = arr[0];for (int i = 1; i < arr.length; i++) {String goods = arr[i];collector.collect(name+"有"+goods);}}});//4. sink-数据输出flatMapStream.print();//5. execute-执行env.execute();}
}
这里在 flatMap
函数内部,按逗号拆分每行数据,遍历拆分后的数组(除第一个元素作为名称外),通过 collector
将新组合的字符串收集输出。
(三)filter 算子
功能概述
filter 算子依据用户定义的过滤条件,对输入流元素进行筛选,满足条件的元素继续向下游传递,不满足的则被过滤掉。
代码示例
读取map算子中的访问日志数据,过滤出访问 IP
是 83.149.9.216
的访问日志,代码实现如下:
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.commons.lang3.time.DateUtils;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.util.Date;public class FilterDemo {@Data@AllArgsConstructor@NoArgsConstructorstatic class LogBean{String ip; // 访问ipint userId; // 用户idlong timestamp; // 访问时间戳String method; // 访问方法String path; // 访问路径}public static void main(String[] args) throws Exception {// 准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 加载数据DataStream<String> fileStream = env.readTextFile("F:\\BD230801\\FlinkDemo\\datas\\a.log");// 数据处理转换DataStream<String> filterStream = fileStream.filter(new FilterFunction<String>() {@Overridepublic boolean filter(String line) throws Exception {String ip = line.split(" ")[0];return ip.equals("83.149.9.216");}});// 数据输出filterStream.print();// 执行env.execute();}
}
在 filter
函数中,通过拆分每行日志获取 IP
地址,并与目标 IP
进行比较决定是否保留该条日志数据。
(四)keyBy 算子
功能概述
keyBy 算子在流处理中用于对数据按照指定的键进行分组,类似于 SQL 中的 group by
,后续可基于分组进行聚合等操作,支持对元组类型和 POJO
类型数据按不同方式指定分组键。
流处理中没有groupBy,而是keyBy
KeySelector对象可以支持元组类型,也可以支持POJO[Entry、JavaBean]
元组类型
单个字段keyBy
//用字段位置(已经被废弃)
wordAndOne.keyBy(0)//用字段表达式
wordAndOne.keyBy(v -> v.f0)
多个字段keyBy
//用字段位置
wordAndOne.keyBy(0, 1);//用KeySelector
wordAndOne.keyBy(new KeySelector<Tuple2<String, Integer>, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> getKey(Tuple2<String, Integer> value) throws Exception {return Tuple2.of(value.f0, value.f1);}
});
类似于sql中的group by
select sex,count(1) from student group by sex;
group by 后面也可以跟多个字段进行分组,同样 keyBy 也支持使用多个列进行分组
POJO
public class PeopleCount {private String province;private String city;private Integer counts;public PeopleCount() {}//省略其他代码。。。
}
单个字段keyBy
source.keyBy(a -> a.getProvince());
多个字段keyBy
source.keyBy(new KeySelector<PeopleCount, Tuple2<String, String>>() {@Overridepublic Tuple2<String, String> getKey(PeopleCount value) throws Exception {return Tuple2.of(value.getProvince(), value.getCity());}
});
代码示例
假设有数据表示不同球类的数量,格式为 Tuple2
(球类名称,数量),如 Tuple2.of("篮球", 1)
等,需求是统计篮球、足球各自的总数量。代码如下:
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class KeyByDemo {public static void main(String[] args) throws Exception {// 准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 加载数据DataStream<Tuple2<String, Integer>> dataStream = env.fromElements(Tuple2.of("篮球", 1),Tuple2.of("篮球", 2),Tuple2.of("篮球", 3),Tuple2.of("足球", 3),Tuple2.of("足球", 2),Tuple2.of("足球", 3));// 数据处理转换/*KeyedStream<Tuple2<String, Integer>, String> keyedStream = dataStream.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {@Overridepublic String getKey(Tuple2<String, Integer> tuple2) throws Exception {return tuple2.f0;}});*/KeyedStream<Tuple2<String, Integer>, String> keyedStream = dataStream.keyBy(tuple -> tuple.f0);keyedStream.sum(1).print();// 执行env.execute();}
}
这里通过 lambda
表达式指定 Tuple2
的第一个元素(球类名称)作为分组键,对数据分组后使用 sum
聚合统计每种球类的数量总和。
pojo演示
package com.bigdata.day02;import lombok.AllArgsConstructor;
import lombok.Data;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/*** @基本功能:* @program:FlinkDemo* @author: 闫哥* @create:2024-05-13 14:32:52**/
public class Demo07 {@Data@AllArgsConstructorstatic class Ball{private String ballName;private int num;}public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2. source-加载数据//3. transformation-数据处理转换//4. sink-数据输出// 以下演示数据是pojoDataStreamSource<Ball> ballSource = env.fromElements(new Ball("篮球", 1),new Ball("篮球", 2),new Ball("篮球", 3),new Ball("足球", 3),new Ball("足球", 2),new Ball("足球", 3));ballSource.keyBy(ball -> ball.getBallName()).print();ballSource.keyBy(new KeySelector<Ball, String>() {@Overridepublic String getKey(Ball ball) throws Exception {return ball.getBallName();}});//5. execute-执行env.execute();}
}
(五)reduce 算子
功能概述
reduce 算子可对一个数据集或一个分组进行聚合计算,将多个元素逐步合并为一个最终元素,常用于求和、求最值等场景,sum
底层其实也是基于 reduce
实现。
代码示例
读取访问日志,统计每个 IP
地址的访问 PV
数量,代码如下:
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.commons.lang3.time.DateUtils;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.util.Date;public class ReduceDemo {@Data@AllArgsConstructor@NoArgsConstructorstatic class LogBean{String ip; // 访问ipint userId; // 用户idlong timestamp; // 访问时间戳String method; // 访问方法String path; // 访问路径}public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2. source-加载数据DataStream<String> fileStream = env.readTextFile("F:\\BD230801\\FlinkDemo\\datas\\a.log");//3. transformation-数据处理转换// 此处也可以将数据放入到tuple中,tuple可以支持到tuple25DataStream<LogBean> mapStream = fileStream.map(new MapFunction<String, LogBean>() {@Overridepublic LogBean map(String line) throws Exception {String[] arr = line.split(" ");String ip = arr[0];int userId = Integer.valueOf(arr[1]);String createTime = arr[2];// 如何将一个时间字符串变为时间戳// 17/05/2015:10:05:30/*SimpleDateFormat simpleDateFormat = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss");Date date = simpleDateFormat.parse(createTime);long timeStamp = date.getTime();*/// 要想使用这个common.lang3 下的工具类,需要导入包Date date = DateUtils.parseDate(createTime, "dd/MM/yyyy:HH:mm:ss");long timeStamp = date.getTime();String method = arr[3];String path = arr[4];LogBean logBean = new LogBean(ip, userId, timeStamp, method, path);return logBean;}});DataStream<Tuple2<String, Integer>> mapStream2 = mapStream.map(new MapFunction<LogBean, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(LogBean logBean) throws Exception {return Tuple2.of(logBean.getIp(), 1);}});//4. sink-数据输出KeyedStream<Tuple2<String,Integer>, String> keyByStream = mapStream2.keyBy(tuple -> tuple.f0);// sum的底层是 reduce// keyByStream.sum(1).print();// [ ("10.0.0.1",1),("10.0.0.1",1),("10.0.0.1",1) ]keyByStream.reduce(new ReduceFunction<Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> reduce(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) throws Exception {// t1 => ("10.0.0.1",10)// t2 => ("10.0.0.1",1)return Tuple2.of(t1.f0, t1.f1 + t2.f1);}}).print();//5. execute-执行env.execute();}
}
先将日志数据处理成包含 IP
和计数 1
的 Tuple2
格式,按 IP
分组后,在 reduce
函数中对相同 IP
的计数进行累加,得到每个 IP
的访问 PV
数。
二、合并与连接操作
(一)union 算子
功能概述
union 算子能够合并多个同类型的流,将多个 DataStream
合并成一个 DataStream
,但注意合并时流的类型必须一致,且不会对数据去重。
代码示例
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class UnionConnectDemo {public static void main(String[] args) throws Exception {// 准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 加载数据DataStreamSource<String> stream1 = env.fromElements("hello", "nihao", "吃甘蔗的人");DataStreamSource<String> stream2 = env.fromElements("hello", "kong ni qi wa", "看电子书的人");// 合并流DataStream<String> unionStream = stream1.union(stream2);unionStream.print();// 执行env.execute();}
}
上述代码将两个包含字符串元素的流进行合并输出。
(二)connect 算子
功能概述
connect 算子可连接 2 个不同类型的流,连接后形成 ConnectedStreams
,内部两个流保持各自的数据和形式独立,之后需通过自定义处理逻辑(如 CoMapFunction
等)处理后再输出,且处理后的数据类型需相同。
代码示例
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;public class UnionConnectDemo {public static void main(String[] args) throws Exception {// 准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 加载数据DataStreamSource<String> stream1 = env.fromElements("hello", "nihao", "吃甘蔗的人");DataStreamSource<Long> stream3 = env.fromSequence(1, 10);// 连接流ConnectedStreams<String, Long> connectStream = stream1.connect(stream3);// 处理流DataStream<String> mapStream = connectStream.map(new CoMapFunction<String, Long, String>() {@Overridepublic String map1(String value) throws Exception {return value;}@Overridepublic String map2(Long value) throws Exception {return Long.toString(value);}});// 输出mapStream.print();// 执行env.execute();}
}
这里连接了一个字符串流和一个长整型序列流,通过 CoMapFunction
分别将字符串按原样、长整型转换为字符串后合并输出。
三、侧输出流(Side Outputs)
功能概述
侧输出流可根据自定义规则对输入流数据进行分流,将满足不同条件的数据输出到不同的“分支”流中,方便后续针对性处理。
代码示例
以下示例对流中的数据按照奇数和偶数进行分流并获取分流后的数据:
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;public class SideOutputExample {public static void main(String[] args) throws Exception {// 1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 侧道输出流// 从0到100生成一个Long类型的数据流作为示例数据DataStreamSource<Long> streamSource = env.fromSequence(0, 100);// 定义两个标签,用于区分偶数和奇数的侧输出流OutputTag<Long> tag_even = new OutputTag<Long>("偶数", TypeInformation.of(Long.class));OutputTag<Long> tag_odd = new OutputTag<Long>("奇数", TypeInformation.of(Long.class));// 2. source-加载数据// 使用ProcessFunction来处理每个元素,决定将其输出到哪个侧输出流或者主输出流SingleOutputStreamOperator<Long> process = streamSource.process(new ProcessFunction<Long, Long>() {@Overridepublic void processElement(Long value, ProcessFunction<Long, Long>.Context ctx, Collector<Long> out) throws Exception {// value代表每一个数据,判断是否为偶数if (value % 2 == 0) {// 如果是偶数,将其输出到偶数的侧输出流中ctx.output(tag_even, value);} else {// 如果是奇数,将其输出到奇数的侧输出流中ctx.output(tag_odd, value);}}});// 3. 获取奇数的所有数据,从主输出流中获取对应标签(tag_odd)的侧输出流数据DataStream<Long> sideOutput = process.getSideOutput(tag_odd);sideOutput.print("奇数:");// 获取所有偶数数据,同样从主输出流中获取对应标签(tag_even)的侧输出流数据DataStream<Long> sideOutput2 = process.getSideOutput(tag_even);sideOutput2.print("偶数:");// 4. sink-数据输出(这里通过打印展示了侧输出流的数据,实际应用中可对接其他下游操作)// 5. 执行任务env.execute();}
}
在上述代码中:
- 首先,我们创建了
StreamExecutionEnvironment
来准备 Flink 的执行环境,并设置了运行模式为自动(AUTOMATIC
)。- 接着,通过
fromSequence
方法生成了一个从0
到100
的Long
类型的数据流作为示例的输入数据。- 然后,定义了两个
OutputTag
,分别用于标记偶数和奇数的侧输出流,并且指定了对应的类型信息(这里都是Long
类型)。- 在
process
函数中,针对每个输入的元素(value
),通过判断其是否能被2
整除来决定将其输出到对应的侧输出流中。如果是偶数,就通过ctx.output(tag_even, value)
将其发送到偶数侧输出流;如果是奇数,则通过ctx.output(tag_odd, value)
发送到奇数侧输出流。- 最后,通过
getSideOutput
方法分别获取奇数和偶数侧输出流的数据,并进行打印输出,以此展示了侧输出流的分流及获取数据的完整流程。实际应用场景中,这些侧输出流的数据可以进一步对接不同的业务逻辑进行相应处理,比如奇数流进行一种聚合计算,偶数流进行另一种统计分析等。
这样,利用侧输出流的特性,我们可以很灵活地根据自定义条件对数据进行分流处理,满足多样化的数据处理需求。
四、总结
本文围绕 Apache Flink 转换算子展开,旨在助力读者洞悉其核心要点与多样应用,以灵活处理复杂业务数据。
- 常用转换算子:
- map:基于自定义逻辑,对输入元素逐一变换,如剖析日志字符串、提取关键信息并转换格式,封装为定制对象输出,契合精细化处理需求。
- flatMap:将输入元素按需拆分为零个及以上输出元素,凭借拆分、重组操作,挖掘数据深层价值,适配展开、细化数据场景。
- filter:依设定条件甄别筛选,精准把控数据流向,剔除不符元素,保障下游数据贴合业务关注点。
- keyBy:类比 SQL 分组,依指定键归拢数据,为聚合奠基,以不同方式适配多元数据类型,实现分类统计。
- reduce:聚焦数据集或分组,渐进聚合,可求和、求最值等,sum 操作底层亦仰仗于此,高效整合数据得出汇总结果。
- 合并与连接操作:
- union:整合同类型多流为一,操作简便,唯需留意类型一致,虽不除重但拓宽数据维度。
- connect:桥接不同类型流,借自定义逻辑协同处理,统一输出类型,达成跨流数据交互融合。
- 侧输出流特性:基于自定义规则巧妙分流,借
OutputTag
标记、ProcessFunction
判定,将数据导向不同 “分支”,按需对接各异业务逻辑,于复杂场景中尽显灵活应变优势,全方位满足数据处理多元化诉求。
相关文章:
Flink--API 之Transformation-转换算子的使用解析
目录 一、常用转换算子详解 (一)map 算子 (二)flatMap 算子 (三)filter 算子 (四)keyBy 算子 元组类型 POJO (五)reduce 算子 二、合并与连接操作 …...
火山引擎VeDI在AI+BI领域的演进与实践
随着数字化时代的到来,企业对于数据分析与智能决策的需求日益增强。作为新一代企业级数据智能平台,火山引擎数智平台VeDI基于字节跳动多年的“数据驱动”实践经验,也正逐步在AI(人工智能)与BI(商业智能&…...
java获取docker镜像构建日志
在Java中获取Docker镜像的构建日志,你可以使用Docker Engine API。以下是一个使用OkHttp库的示例代码,用于获取构建日志: import okhttp3.*; import java.io.IOException; public class DockerLogsFetcher { private static final St…...
Spring-boot整合Webservice服务端
Spring Boot整合Webservice服务端 本文是基于前辈一顿吃不饱的文章SpringBoot整合WebService(服务端客户端)-CSDN博客,由于工作需要用.NET调用其他系统发布的WebService服务,尝试用java搭建一个WebService服务端测试一下…...
动静分离具体是怎么实现的?
在 Nginx 中实现动静分离是一种常见的优化手段,用于提高网站的性能和可扩展性。以下是 Nginx 动静分离的一些基本概念和配置方法: 1、什么是动静分离: 动静分离是指将网站的静态资源(如图片、CSS、JavaScript 文件)与…...
如何取出.vmdk文件中的数据
前提:我的云服务器到期了,于是我将云服务器导出了.vmdk镜像。本想在vm虚拟机中启动,但是一直报错。很是苦恼。 首先下载DiskGenius这个软件。 点击磁盘-》打开磁盘 打开.vmdk文件 可以看到内部的文件了,可以选择对应文件导出到桌…...
Vue2中 vuex 的使用
1.安装 vuex 安装vuex与vue-router类似,vuex是一个独立存在的插件,如果脚手架初始化没有选 vuex,就需要额外安装。 yarn add vuex3 或者 npm i vuex3 233 Vue2 Vue-Router3 Vuex3 344 Vue3 Vue-Router4 Vuex4 2. 新建 store/index.j…...
Swift 数据类型
Swift 数据类型 Swift 是一种强类型语言,这意味着在 Swift 中声明的每个变量和常量都必须具有明确的类型。Swift 的类型系统旨在帮助开发者编写清晰、安全的代码。本文将详细介绍 Swift 中的基本数据类型,包括整数、浮点数、布尔值、字符和字符串。 整…...
【pyspark学习从入门到精通22】机器学习库_5
训练-验证分割 TrainValidationSplit 模型为了选择最佳模型,会对输入数据集(训练数据集)进行随机分割,分成两个子集:较小的训练子集和验证子集。分割只执行一次。 在这个例子中,我们还将使用 ChiSqSelect…...
Zookeeper3.5.8集群部署
环境说明 准备三台服务器,我这边是虚拟机,分别为:bigdata141、bigdata142、bigdata143 下载安装包 下载链接:Index of /dist/zookeeper/zookeeper-3.5.8 下载完后,上传到其中一台服务器,我这边上传到 b…...
Linux 无图形界面磁盘空间排查与优化实践20241127
Linux 无图形界面磁盘空间排查与优化实践 引言:磁盘空间问题的痛点与挑战 🔍 常见问题 当系统磁盘空间超过 90% 时,不仅可能导致性能下降,还可能让关键操作无法正常完成。这种情况下,如何高效且精准地排查磁盘占用来…...
TCP socket api详解 续
文章目录 守护进程怎么做到?setsid返回值 dev/null字符文件 daemonTCP协议 退出的时候呢? 会话有很多后台任务,bash肯定会退,那后台会话怎么办呢? 理论上也要退的,但实际上关了bash,bash肯定要…...
一道经典的整数划分题——分弹珠
CSDN 博客:一道经典的整数划分题——分弹珠 一、题目描述 这道题目是一道经典的整数划分问题,要求将 (M) 个弹珠分到 (N) 个盘子中,满足以下条件: 允许盘子为空。两种分法被认为相同当且仅当分配的弹珠数量相同(不考…...
浏览器缓存与协商缓存
1. 强缓存(Strong Cache) 定义 强缓存是指在缓存的资源有效期内,浏览器会直接使用缓存中的数据,而不会发起网络请求。也就是说,浏览器会直接从本地缓存读取资源,不会与服务器进行任何交互。 如何控制强缓…...
Maven 如何配置忽略单元测试
在使用 Maven 进行项目构建时,有时您可能希望跳过测试阶段。 这在确保代码更改不影响测试结果或需要快速部署项目的情况下特别有用。 Maven 提供了多种方法来在构建过程中跳过测试。 为什么跳过测试? 加速构建:对于具有大量测试用例的大项…...
哪里能找到好用的动物视频素材 优质网站推荐
想让你的短视频增添些活泼生动的动物元素?无论是搞笑的宠物瞬间,还是野外猛兽的雄姿,这些素材都能让视频更具吸引力。今天就为大家推荐几个超实用的动物视频素材网站,不论你是短视频新手还是老手,都能在这些网站找到心…...
Python中的23种设计模式:详细分类与总结
设计模式是解决特定问题的通用方法,分为创建型模式、结构型模式和行为型模式三大类。以下是对每种模式的详细介绍,包括其核心思想、应用场景和优缺点。 一、创建型模式(Creational Patterns) 创建型模式关注对象的创建࿰…...
研0找实习【学nlp】14--BERT理解
以后做项目,一定要多调查,选用不同组合关键词多搜索! BERT论文解读及情感分类实战_bert模型在imdb分类上的准确率已经到达了多少的水平-CSDN博客 【深度学习】-Imdb数据集情感分析之模型对比(4)- CNN-LSTM…...
【AI日记】24.11.27 学习 kaggle 入门比赛 Titanic - Machine Learning from Disaster
【AI论文解读】【AI知识点】【AI小项目】【AI战略思考】【AI日记】 核心工作 内容:学习 kaggle 入门比赛 Titanic - Machine Learning from Disaster时间:8 小时心得:在学习别人的 notebook 的时候,碰到不懂的知识点,…...
HCIP——堆叠技术实验配置
目录 一、堆叠的理论知识 二、堆叠技术实验配置 三、总结 一、堆叠的理论知识 1.1堆叠概述: 是指将两台交换机通过堆叠线缆连接在一起,从逻辑上变成一台交换设备,作为一个整体参与数据的转发。 1.2堆叠的基本概念 堆叠系统中所有的单台…...
trtllm 部署新体验
实验清华大模型和trtllm Chatglm3 pip3 install tensorrt_llm -U --pre --extra-index-url https://pypi.nvidia.com 要安装git来下载仓库 使用这个chatglm的例子 安装依赖 用最新的glm3的model 然后开始转换model 官方写错了,这应该是个-,不是_&a…...
部署 DeepSpeed以推理 defog/sqlcoder-70b-alpha 模型
部署 DeepSpeed 以推理 defog/sqlcoder-70b-alpha 这样的 70B 模型是一个复杂的过程,涉及多个关键步骤。下面是详细的步骤,涵盖了从模型加载、内存优化到加速推理的全过程。 1. 准备环境 确保你的环境配置正确,以便能够顺利部署 defog/sqlc…...
node.js基础学习-http模块-创建HTTP服务器、客户端(一)
http模块式Node.js内置的模块,用于创建和管理HTTP服务器。Node.js使用JavaScript实现,因此性能更好。 使用http模块创建服务器,我们建议使用commonjs模块规范,因为很多第三方的组件都使用了这种规范。当然es6写法也支持。 下面就是…...
Cobalt Strike 4.8 用户指南-第十一节 C2扩展
11.1、概述 Beacon 的 HTTP 指标由 Malleable Command and Control (Malleable C2) 配置文件控制。Malleable C2 配置文件是一个简单的程序,它指定如何转换数据并将其存储在事务中。转换和存储数据的同一程序(向后解释࿰…...
STM32 使用ARM Compiler V6 编译裸机 LWIP协议栈报错的解决方法
在lwip 的cc.h 中使用以下宏定义,来兼容 V5 和 V6编译器 #if defined (__ARMCC_VERSION) && (__ARMCC_VERSION > 6010050) /* ARM Compiler V6 */ #define __CC_ARM /* when use v6 compiler define this */ #endifV6编译的速度确实比V5块了好多倍。 …...
K8S简介、使用教程
以下是关于 Kubernetes(通常缩写为 K8S)的简介和使用教程: 一、Kubernetes 简介 定义与作用 Kubernetes 是一个开源的容器编排平台,用于自动化部署、扩展和管理容器化应用程序。它最初由谷歌开发,后捐赠给云原生计算基…...
Scala—Map用法详解
Scala—Map用法详解 在 Scala 中,Map 是一种键值对的集合,其中每个键都是唯一的。Scala 提供了两种类型的 Map:不可变 Map 和可变 Map。 1. 不可变集合(Map) 不可变 Map 是默认的 Map 实现,位于 scala.co…...
2024御网杯信息安全大赛个人赛wp(misc方向)
目录 一.信息安全大赛的通知二、编码转换1. 第一部分2. 第二部分3. 第三部分 三、1.txt四、buletooth 题目附件以及工具链接: 通过网盘分享的文件:御网杯附件 链接: https://pan.baidu.com/s/1LNA6Xz6eZodSV0Io9jGSZg 提取码: jay1 –来自百度网盘超级会…...
DeepSpeed 配置文件(DeepSpeed Configuration Files)详解:中英文解释
中文版 本文详细介绍 DeepSpeed 配置文件,结合 4 卡 3090 的实际使用场景,重点解释各个参数的含义,并提供应对爆显存的方案。 DeepSpeed 配置文件详解:从基础到实战 DeepSpeed 是用于加速大规模分布式训练的重要工具,…...
AI 助力开发新篇章:云开发 Copilot 深度体验与技术解析
本文 一、引言:技术浪潮中的个人视角1.1 AI 和低代码的崛起1.2 为什么选择云开发 Copilot? 二、云开发 Copilot 的核心功能解析2.1 自然语言驱动的低代码开发2.1.1 自然语言输入示例2.1.2 代码生成的模块化支持 2.2 实时预览与调整2.2.1 实时预览窗口功能…...
QTableWidget使用代理绘制分行显示
在这里插入代码片# 创建主窗口类: 使用 QTableWidget 作为核心控件。 设置表头及行列信息。 自定义代理: 继承 QStyledItemDelegate,实现代理模式。 重写 paint 和 sizeHint 方法,支持多行文本绘制。 设置行高以适应多行显示。 …...
Linux系统之fuser命令的基本使用
Linux系统之fuser命令的基本使用 一、fuser命令介绍二、fuser命令使用帮助2.1 help帮助信息2.1 基本语法①通用选项②文件/设备相关选项③网络相关选项④进程操作选项⑤其他选项 三、fuser命令的基本使用3.1 查找挂载点的进程3.2 查看指定设备进程信息3.3 查找监听特定端口的进…...
解决`-bash: ./configure:/bin/sh^M:解释器错误: 没有那个文件或目录`的问题
解决`-bash: ./configure:/bin/sh^M:解释器错误: 没有那个文件或目录`的问题 一、错误原因分析二、解决方法方法一:使用`dos2unix`工具方法二:使用`sed`命令方法三:使用`tr`命令方法四:在文本编辑器中转换方法五:在Windows系统中使用适当的工具三、预防措施四、总结在使…...
【时时三省】(C语言基础)结构体的声明
山不在高,有仙则名。水不在深,有龙则灵。 ----CSDN 时时三省 结构的基础知识 结构是一些值的集合,这些值称为成员变量。结构的每个成员可以是不同类型的变量。 数组是一组相同类型的元素的集合 结构体也是一些值的集合,结构的每…...
群聊前选择患者功能的实现
和普通群聊不同,开启一个图文会话聊天,必须先选择患者、团队、医生。 原来是集成到腾讯IM当中,现在需要单独写一个页面 原来的代码在这里: const handleShow () > {uni.navigateTo({url: /pageB/active-home/active-home})}…...
目标检测,图像分割,超分辨率重建
目标检测和图像分割 目标检测和图像分割是计算机视觉中的两个不同任务,它们的输出形式也有所不同。下面我将分别介绍这两个任务的输出。图像分割又可以分为:语义分割、实例分割、全景分割。 语义分割(Semantic Segmentation)&…...
关于 EKS Bottlerocket AMI 版本与 Karpenter 配置的说明
问题1: Bottlerocket AMI 版本问题 之前,后端团队发现在使用 Bottlerocket v1.26.2 AMI 版本时,存在某些问题。经过 Bottlerocket 团队调查,此行为是罕见的 race condition 导致的结果。 我们在环境中重现了此状况,并且关注到由于 kubelet device manager 的启动时间晚于 NVI…...
安全设备-日志审计-网络路由配置
1 网络设置 菜单项‘网络’‘网络设置’子项。进入网卡列表展示页面。点击操作列‘编辑’图标,可编辑对应网卡信息。 图11-1 网卡信息 设备接口的 GE0/1 接口IP:192.168.0.1子网掩码:255.255.255.0 图11-2 配置网卡 1 IP地址2 子网掩码3 主机DNS4 备份DNS 2 通…...
Linux服务器生成SSH 密钥对与 GitLab 仓库进行交互
目录 生成 SSH 密钥对 将公钥添加到 GitLab 测试 SSH 连接 生成 SSH 密钥对 在执行脚本的机器上打开终端,执行以下命令(假设使用默认的 RSA 算法,一路回车使用默认设置即可,也可以根据需要指定其他算法和参数)&…...
react 的路由功能
1. 安装依赖 pnpm add react-router-dom 2. 基本的路由设置(BrowserRouter) 在 main.tsx 入口文件中使用BrowserRouter组件来包裹整个应用。它会监听浏览器的 URL 变化。 import { StrictMode } from "react";import { createRoot } from …...
vscode python code runner执行乱码
打开vscode code runner插件配置,如图所示: 然后在setting.json修改运行python的默认命令: 将原来 替换成 "python":"set PYTHONIOENCODINGutf8 && python", 参考:Vscode——python环境输出中文乱…...
Element UI 打包探索【3】
目录 第九个命令 node build/bin/gen-cssfile gulp build --gulpfile packages/theme-chalk/gulpfile.js cp-cli packages/theme-chalk/lib lib/theme-chalk 至此,dist命令完成。 解释why Element UI 打包探索【1】里面的why Element UI 打包探索【2】里面…...
windows使用docker安装centos7
参考文章:docker容器安装CentOS7.9 需要指出来的步骤 2.5 安装常用工具及ssh服务 由于centos7不维护,需要更换镜像源才能正常使用yum install命令安装 更换镜像源文章:CentOS 7配置yum镜像源 2.9Xshell远程连接docke_centos7.9 文章没有指…...
写一个流程,前面的圆点和线,第一个圆上面没有线,最后一个圆下面没有线
上图 最近写类似于这种的还挺多的,记录一下css方法 遍历列表之后 <div class"item" v-for"(item,index) in recordList"> 加这样一个盒子 <div class"timeline"> <div class"line1" v-if"index ! 0&…...
Javascript Insights: Visualizing Var, Let, And Const In 2024
11/2024 出版 MP4 |视频:h264, 19201080 |音频:AAC,44.1 KHz 语言:英语 |大小: 2.96 GB |时长: 5 小时 34 分钟 为所有认真的 JavaScript 开发人员可视化与 VAR、LET、CONST 和 EXECUTON CONTE…...
【工具】AI 工具集整理推荐
ai工具集 我私人使用了一段时间,效果不错,有很多AI工具,可以提升工作的效率。...
如何通过终端连接无线网
1 先连接对方服务器 ssh root192.168.3.219 # root是用户名字 192.168.3.219是对方的ip地址2 开启 WiFi:输入sudo nmcli r wifi on,开启系统的无线网络功能 sudo nmcli r wifi on3 扫描附近的 WiFi 热点:执行sudo nmcli dev wifi&#x…...
elasticsearch报错fully-formed single-node cluster with cluster UUID
1.问题描述 k8s集群内部署的es中间件起不来,查看日志发现如下警告,节点发现功能开启,但是目前我是单节点服务,所以尝试编辑sts将节点发现功能去掉或者在部署时将你的sts的yaml文件和chart文件修改重新部署以去掉该功能 {"t…...
前端 vue3 + element-plus + ts 对话框示例
【父组件】:SampleInput.vue,局部代码片段 引入子组件 ApplyItemChooseDialog.vue,定义变量,用于渲染和显示标识 <script>片段代码 import ApplyItemChooseDialog from "/views/accept/ApplyItemChooseDialog.vue&q…...
南京移动“智慧+关怀”服务体系助力老年群体生活安全有保障
在数字化浪潮汹涌澎湃的当下,江苏移动南京分公司秉持“人民邮电为人民”的服务理念,推出一系列创新服务举措,为社区老年群体提供贴心、便捷的数字服务,让老人在享受科技发展成果的同时,感受到社会的温暖与关怀。 贴心…...