Flink学习连载文章11--双流Join
双流 Join 和两个流合并是不一样的
两个流合并:两个流变为 1 个流 union connect
双流 join: 两个流 join,其实这两个流还是原来的,只是满足条件的数据会变为一个新的流。
可以结合 sql 语句中的 union 和 join 的区别。
在离线 Hive 中,我们经常会使用 Join 进行多表关联。那么在实时中我们应该如何实现两条流的 Join 呢?Flink DataStream API 为我们提供了3个算子来实现双流 join,分别是:
- join -- 类似于我们以前学过的内连接 inner join
- coGroup -- 类似于我们以前学过的 内连接,左连接,右连接
- intervalJoin -- 一个流中的数据可以关联另一个流中一个时间段的所有数据
下面我们分别详细看一下这3个算子是如何实现双流 Join 的。
1. Join
Joining | Apache Flink
Join 算子提供的语义为 “Window join”,即按照指定字段和(滚动/滑动/会话)窗口进行内连接(InnerJoin)。Join 将有相同 Key 并且位于同一窗口中的两条流的元素进行关联。
Join 可以支持处理时间(processing time)和事件时间(event time)两种时间特征。
Join 通用用法如下:
stream.join(otherStream)
.where(<KeySelector>)
.equalTo(<KeySelector>)
.window(<WindowAssigner>)
.apply(<JoinFunction>)
Join 语义类似与离线 Hive 的 InnnerJoin (内连接),这意味着如果一个流中的元素在另一个流中没有相对应的元素,则不会输出该元素。
下面我们看一下 Join 算子在不同类型窗口上的具体表现。
1.1 滚动窗口Join
当在滚动窗口上进行 Join 时,所有有相同 Key 并且位于同一滚动窗口中的两条流的元素两两组合进行关联,并最终传递到 JoinFunction 或 FlatJoinFunction 进行处理。
如上图所示,我们定义了一个大小为 2 秒的滚动窗口,最终产生 [0,1],[2,3],… 这种形式的数据。上图显示了每个窗口中橘色流和绿色流的所有元素成对组合。需要注意的是,在滚动窗口 [6,7] 中,由于绿色流中不存在要与橘色流中元素 6、7 相关联的元素,因此该窗口不会输出任何内容。
下面我们一起看一下如何实现上图所示的滚动窗口 Join:
可以通过两个socket流,将数据合并为一个三元组,key,value1,value2
package com.bigdata.day07;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.Arrays;
import java.util.Date;/*** @基本功能:* @program:FlinkDemo* @author: 闫哥* @create:2023-11-27 09:31:57**/
public class _ShuangLiuJoinDemo {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 并行度不为1 ,效果很难出来,因为本地的并行度是16,只有16个并行度都触发才能看到效果env.setParallelism(1);//2. source-加载数据 key,0,2021-03-26 12:09:00DataStream<Tuple3<String, Integer, String>> greenStream = env.socketTextStream("localhost", 8888).map(new MapFunction<String, Tuple3<String, Integer, String>>() {@Overridepublic Tuple3<String, Integer, String> map(String line) throws Exception {String[] arr = line.split(",");System.out.println("绿色:"+ Arrays.toString(arr));return Tuple3.of(arr[0], Integer.valueOf(arr[1]), arr[2]);}})// 因为用到了EventTime 所以势必用到水印,否则报错.assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, Integer, String>>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Integer, String>>() {@Overridepublic long extractTimestamp(Tuple3<String, Integer, String> element, long recordTimestamp) {Long timeStamp = 0L;SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");Date date = null;try {date = simpleDateFormat.parse(element.f2);} catch (ParseException e) {throw new RuntimeException(e);}timeStamp = date.getTime();System.out.println("绿色的时间:"+timeStamp);System.out.println(element.f0);return timeStamp;}}));;// 以后这个9999少用,因为kafka占用这个端口 key,0,2021-03-26 12:09:00DataStream<Tuple3<String, Integer, String>> orangeStream = env.socketTextStream("localhost", 7777).map(new MapFunction<String, Tuple3<String,Integer,String>>() {@Overridepublic Tuple3<String, Integer, String> map(String line) throws Exception {String[] arr = line.split(",");System.out.println("橘色:"+ Arrays.toString(arr));return Tuple3.of(arr[0],Integer.valueOf(arr[1]),arr[2]);}})// 因为用到了EventTime 所以势必用到水印,否则报错.assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, Integer, String>>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Integer, String>>() {@Overridepublic long extractTimestamp(Tuple3<String, Integer, String> element, long recordTimestamp) {Long timeStamp = 0L;SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");Date date = null;try {date = simpleDateFormat.parse(element.f2);} catch (ParseException e) {throw new RuntimeException(e);}timeStamp = date.getTime();System.out.println("橘色的时间:"+timeStamp);return timeStamp;}}));//3. transformation-数据处理转换DataStream resultStream = greenStream.join(orangeStream).where(tup3 -> tup3.f0).equalTo(tup3 -> tup3.f0).window(TumblingEventTimeWindows.of(Time.seconds(5))).apply(new JoinFunction<Tuple3<String, Integer, String>, Tuple3<String, Integer, String>, Tuple3<String, Integer, Integer>>() {@Overridepublic Tuple3<String, Integer, Integer> join(Tuple3<String, Integer, String> t1, Tuple3<String, Integer, String> t2) throws Exception {System.out.println(t1.f2);System.out.println(t2.f2);return Tuple3.of(t1.f0, t1.f1, t2.f1);}});//4. sink-数据输出resultStream.print();//5. execute-执行env.execute();}
}
总结非常重要:
1) 要想测试这个效果,需要将并行度设置为1
2)窗口中数据的打印是需要触发的,没有触发的数据,窗口内是不会进行计算的,所以记得输入触发的数据。
假如使用了EventTime 作为时间语义,不管是窗口开始和结束时间还是触发的条件,都跟系统时间没有关系,而跟输入的数据有关系,举例:
假如你的第一条数据是:key,0,2021-03-26 12:09:01 窗口的大小是5s,水印是3秒 ,窗口的开始时间为:
2021-03-26 12:09:00 结束时间是 2021-03-26 12:09:05 ,触发时间是2021-03-26 12:09:08
为什么呢? 水印时间 >= 结束时间
水印时间是:2021-03-26 12:09:08 - 3 = 2021-03-26 12:09:05 >=2021-03-26 12:09:05
时间窗口如果是 00~05 ,05 这个时间点的数据是不包含在本窗口内的,而是归于下一个窗口,所谓的前包后不包。
如上代码所示为绿色流和橘色流指定 BoundedOutOfOrdernessWatermarks Watermark 策略,设置100毫秒的最大可容忍的延迟时间,同时也会为流分配事件时间戳。假设输入流为 格式,两条流输入元素如下所示:
绿色流:
key,0,2021-03-26 12:09:00
key,1,2021-03-26 12:09:01
key,2,2021-03-26 12:09:02
key,4,2021-03-26 12:09:04
key,5,2021-03-26 12:09:05
key,8,2021-03-26 12:09:08
key,9,2021-03-26 12:09:09
key,11,2021-03-26 12:09:11橘色流:
key,0,2021-03-26 12:09:00
key,1,2021-03-26 12:09:01
key,2,2021-03-26 12:09:02
key,3,2021-03-26 12:09:03
key,4,2021-03-26 12:09:04
key,6,2021-03-26 12:09:06
key,7,2021-03-26 12:09:07
key,11,2021-03-26 12:09:11
1.2 滑动窗口Join [解释一下即可,不用深究 ]
当在滑动窗口上进行 Join 时,所有有相同 Key 并且位于同一滑动窗口中的两条流的元素两两组合进行关联,并最终传递到 JoinFunction 进行处理。
如上图所示,我们定义了一个窗口大小为 2 秒、滑动步长为 1 秒的滑动窗口。需要注意的是,一个元素可能会落在不同的窗口中,因此会在不同窗口中发生关联,例如,绿色流中的0元素。当滑动窗口中一个流的元素在另一个流中没有相对应的元素,则不会输出该元素。
下面我们一起看一下如何实现上图所示的滑动窗口 Join:
package com.bigdata.day07;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.Date;/*** @基本功能: 演示join的滑动窗口* @program:FlinkDemo* @author: 闫哥* @create:2024-05-20 09:11:13**/
public class Demo02Join {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 将并行度设置为1,否则很难看到现象env.setParallelism(1);// 创建一个绿色的流DataStreamSource<String> greenSource = env.socketTextStream("localhost", 8899);// key,0,2021-03-26 12:09:00 将它变为三元组SingleOutputStreamOperator<Tuple3<String, Integer, String>> greenDataStream = greenSource.map(new MapFunction<String, Tuple3<String, Integer, String>>() {@Overridepublic Tuple3<String, Integer, String> map(String value) throws Exception {String[] arr = value.split(",");return new Tuple3<>(arr[0], Integer.valueOf(arr[1]), arr[2]);}}).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, Integer, String>>forBoundedOutOfOrderness(Duration.ofSeconds(3))// 为什么这个地方的代码比之前要长,原因是以前获取的数据都是long类型,并且都是毫秒值.withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Integer, String>>() {@Overridepublic long extractTimestamp(Tuple3<String, Integer, String> element, long recordTimestamp) {// 指定你的数据中哪一个是时间戳,并且时间戳必须是long类型,必须是毫秒为单位的。String time = element.f2; //2021-03-26 12:09:00SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");try {Date date = sdf.parse(time);return date.getTime();} catch (ParseException e) {throw new RuntimeException(e);}}}));// 创建一个橘色的流DataStreamSource<String> orangeSource = env.socketTextStream("localhost", 9988);// key,0,2021-03-26 12:09:00 将它变为三元组SingleOutputStreamOperator<Tuple3<String, Integer, String>> orangeDataStream = orangeSource.map(new MapFunction<String, Tuple3<String, Integer, String>>() {@Overridepublic Tuple3<String, Integer, String> map(String value) throws Exception {String[] arr = value.split(",");return new Tuple3<>(arr[0], Integer.valueOf(arr[1]), arr[2]);}}).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, Integer, String>>forBoundedOutOfOrderness(Duration.ofSeconds(3))// 为什么这个地方的代码比之前要长,原因是以前获取的数据都是long类型,并且都是毫秒值.withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Integer, String>>() {@Overridepublic long extractTimestamp(Tuple3<String, Integer, String> element, long recordTimestamp) {// 指定你的数据中哪一个是时间戳,并且时间戳必须是long类型,必须是毫秒为单位的。String time = element.f2; //2021-03-26 12:09:00SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");try {Date date = sdf.parse(time);return date.getTime();} catch (ParseException e) {throw new RuntimeException(e);}}}));//2. source-加载数据//3. transformation-数据处理转换DataStream<Tuple3<String, Integer, Integer>> resultStream = greenDataStream.join(orangeDataStream).where(tuple3 -> tuple3.f0).equalTo(tuple3 -> tuple3.f0).window(SlidingEventTimeWindows.of(Time.seconds(5),Time.seconds(1))).apply(new JoinFunction<Tuple3<String, Integer, String>, Tuple3<String, Integer, String>, Tuple3<String, Integer, Integer>>() {@Overridepublic Tuple3<String, Integer, Integer> join(Tuple3<String, Integer, String> first, Tuple3<String, Integer, String> second) throws Exception {return Tuple3.of(first.f0, first.f1, second.f1);}});//4. sink-数据输出greenDataStream.print("绿色的流:");orangeDataStream.print("橘色的流:");resultStream.print("最终的结果:");//5. execute-执行env.execute();}
}
假设输入流为 格式,两条流输入元素如下所示:
绿色流:
key,0,2021-03-26 12:09:00
key,3,2021-03-26 12:09:03
key,4,2021-03-26 12:09:04
key,9,2021-03-26 12:09:09橘色流:
key,0,2021-03-26 12:09:00
key,1,2021-03-26 12:09:01
key,2,2021-03-26 12:09:02
key,3,2021-03-26 12:09:03
key,4,2021-03-26 12:09:04
key,9,2021-03-26 12:09:09
2. CoGroup
CoGroup 算子是将两条数据流按照 Key 进行分组,然后将相同 Key 的数据进行处理。要实现 CoGroup 功能需要为两个输入流分别指定 KeySelector 和 WindowAssigner。它的调用方式类似于 Join 算子,但是 CoGroupFunction 比 JoinFunction 更加灵活,可以按照用户指定的逻辑匹配左流或者右流的数据,基于此我们可以实现内连接(InnerJoin)、左连接(LeftJoin)以及右连接(RightJoin)。
目前,这些分组中的数据是在内存中保存的,因此需要确保保存的数据量不能太大,否则,JVM 可能会崩溃。
CoGroup 通用用法如下:
stream.coGroup(otherStream).where(<KeySelector>).equalTo(<KeySelector>).window(<WindowAssigner>).apply(<CoGroupFunction>);
下面我们看一下如何使用 CoGroup 算子实现内连接(InnerJoin)、左连接(LeftJoin)以及右连接(RightJoin)。
最大的优势是可以实现内连接,左连接,右连接,但是缺点是内存压力大,而上面的join只能实现内连接。
CoGroup 从写法上,是coGroup 和 join的区别,而且apply 里面的函数也是不一样的,一定要注意观察。
2.1 InnerJoin
下面我们看一下如何使用 CoGroup 实现内连接:
如上图所示,我们定义了一个大小为 2 秒的滚动窗口。InnerJoin 只有在两个流对应窗口中都存在元素时,才会输出。
我们以滚动窗口为例来实现 InnerJoin
package com.bigdata.day07;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.CoGroupedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.Arrays;
import java.util.Date;/*** @基本功能:* @program:FlinkDemo* @author: 闫哥* @create:2023-11-27 09:31:57**/
public class _ShuangLiuCoGroupDemo {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);env.setParallelism(1);//2. source-加载数据 key,0,2021-03-26 12:09:00DataStream<Tuple3<String, Integer, String>> greenStream = env.socketTextStream("localhost", 8888).map(new MapFunction<String, Tuple3<String, Integer, String>>() {@Overridepublic Tuple3<String, Integer, String> map(String line) throws Exception {String[] arr = line.split(",");System.out.println("绿色:"+ Arrays.toString(arr));return Tuple3.of(arr[0], Integer.valueOf(arr[1]), arr[2]);}})// 因为用到了EventTime 所以势必用到水印,否则报错.assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, Integer, String>>forBoundedOutOfOrderness(Duration.ofSeconds(1)).withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Integer, String>>() {@Overridepublic long extractTimestamp(Tuple3<String, Integer, String> element, long recordTimestamp) {Long timeStamp = 0L;SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");Date date = null;try {date = simpleDateFormat.parse(element.f2);} catch (ParseException e) {throw new RuntimeException(e);}timeStamp = date.getTime();System.out.println("绿色的时间:"+timeStamp);System.out.println(element.f0);return timeStamp;}}));;// 以后这个9999少用,因为kafka占用这个端口 key,0,2021-03-26 12:09:00DataStream<Tuple3<String, Integer, String>> orangeStream = env.socketTextStream("localhost", 7777).map(new MapFunction<String, Tuple3<String,Integer,String>>() {@Overridepublic Tuple3<String, Integer, String> map(String line) throws Exception {String[] arr = line.split(",");System.out.println("橘色:"+ Arrays.toString(arr));return Tuple3.of(arr[0],Integer.valueOf(arr[1]),arr[2]);}})// 因为用到了EventTime 所以势必用到水印,否则报错.assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, Integer, String>>forBoundedOutOfOrderness(Duration.ofSeconds(1)).withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Integer, String>>() {@Overridepublic long extractTimestamp(Tuple3<String, Integer, String> element, long recordTimestamp) {Long timeStamp = 0L;SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");Date date = null;try {date = simpleDateFormat.parse(element.f2);} catch (ParseException e) {throw new RuntimeException(e);}timeStamp = date.getTime();System.out.println("橘色的时间:"+timeStamp);return timeStamp;}}));//3. transformation-数据处理转换CoGroupedStreams<Tuple3<String, Integer, String>, Tuple3<String, Integer, String>> coGroup = greenStream.coGroup(orangeStream);coGroup.where(tup3 -> tup3.f0).equalTo(tup3 -> tup3.f0).window(TumblingEventTimeWindows.of(Time.seconds(5))).apply(new CoGroupFunction<Tuple3<String, Integer, String>, Tuple3<String, Integer, String>, String>() {@Overridepublic void coGroup(Iterable<Tuple3<String, Integer, String>> i1, Iterable<Tuple3<String, Integer, String>> i2, Collector<String> collector) throws Exception {// 凭借这两个迭代器实现内连接,左右连接// 内连接 外面这个循环和里面的循环必须都有数据才会进行输出,典型的内连接for (Tuple3<String, Integer, String> t1 : i1) {for (Tuple3<String, Integer, String> t2 : i2) {collector.collect("key="+t1.f0+",t1.value="+t1.f1+",t2.value="+t2.f1);}}}}).print();//5. execute-执行env.execute();}}
如上代码所示,我们实现了 CoGroupFunction 接口,重写 coGroup 方法。一个流中有相同 Key 并且位于同一窗口的元素都会保存在同一个迭代器(Iterable),本示例中绿色流为 greenIterable,橘色流为 orangeIterable,如果要实现 InnerJoin ,只需要两个迭代器中的元素两两组合即可。两条流输入元素如下所示:
绿色流:
key,0,2021-03-26 12:09:00
key,1,2021-03-26 12:09:01
key,2,2021-03-26 12:09:02
key,4,2021-03-26 12:09:04
key,5,2021-03-26 12:09:05
key,8,2021-03-26 12:09:08
key,9,2021-03-26 12:09:09
key,11,2021-03-26 12:09:11橘色流:
key,0,2021-03-26 12:09:00
key,1,2021-03-26 12:09:01
key,2,2021-03-26 12:09:02
key,3,2021-03-26 12:09:03
key,4,2021-03-26 12:09:04
key,6,2021-03-26 12:09:06
key,7,2021-03-26 12:09:07
key,11,2021-03-26 12:09:11
2.2 LeftJoin
下面我们看一下如何使用 CoGroup 实现左连接:
如上图所示,我们定义了一个大小为 2 秒的滚动窗口。LeftJoin 只要绿色流窗口中有元素时,就会输出。即使在橘色流对应窗口中没有相对应的元素。
我们以滚动窗口为例来实现 LeftJoin
package com.bigdata.day07;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.CoGroupedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.Arrays;
import java.util.Date;/*** @基本功能:* @program:FlinkDemo* @author: 闫哥* @create:2023-11-27 09:31:57**/
public class _ShuangLiuCoGroupLeftDemo {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);env.setParallelism(1);//2. source-加载数据 key,0,2021-03-26 12:09:00DataStream<Tuple3<String, Integer, String>> greenStream = env.socketTextStream("localhost", 8888).map(new MapFunction<String, Tuple3<String, Integer, String>>() {@Overridepublic Tuple3<String, Integer, String> map(String line) throws Exception {String[] arr = line.split(",");System.out.println("绿色:"+ Arrays.toString(arr));return Tuple3.of(arr[0], Integer.valueOf(arr[1]), arr[2]);}})// 因为用到了EventTime 所以势必用到水印,否则报错.assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, Integer, String>>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Integer, String>>() {@Overridepublic long extractTimestamp(Tuple3<String, Integer, String> element, long recordTimestamp) {Long timeStamp = 0L;SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");Date date = null;try {date = simpleDateFormat.parse(element.f2);} catch (ParseException e) {throw new RuntimeException(e);}timeStamp = date.getTime();System.out.println("绿色的时间:"+timeStamp);System.out.println(element.f0);return timeStamp;}}));;// 以后这个9999少用,因为kafka占用这个端口 key,0,2021-03-26 12:09:00DataStream<Tuple3<String, Integer, String>> orangeStream = env.socketTextStream("localhost", 7777).map(new MapFunction<String, Tuple3<String,Integer,String>>() {@Overridepublic Tuple3<String, Integer, String> map(String line) throws Exception {String[] arr = line.split(",");System.out.println("橘色:"+ Arrays.toString(arr));return Tuple3.of(arr[0],Integer.valueOf(arr[1]),arr[2]);}})// 因为用到了EventTime 所以势必用到水印,否则报错.assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, Integer, String>>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Integer, String>>() {@Overridepublic long extractTimestamp(Tuple3<String, Integer, String> element, long recordTimestamp) {Long timeStamp = 0L;SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");Date date = null;try {date = simpleDateFormat.parse(element.f2);} catch (ParseException e) {throw new RuntimeException(e);}timeStamp = date.getTime();System.out.println("橘色的时间:"+timeStamp);return timeStamp;}}));//3. transformation-数据处理转换CoGroupedStreams<Tuple3<String, Integer, String>, Tuple3<String, Integer, String>> coGroup = greenStream.coGroup(orangeStream);coGroup.where(tup3 -> tup3.f0).equalTo(tup3 -> tup3.f0).window(TumblingEventTimeWindows.of(Time.seconds(5))).apply(new CoGroupFunction<Tuple3<String, Integer, String>, Tuple3<String, Integer, String>, String>() {@Overridepublic void coGroup(Iterable<Tuple3<String, Integer, String>> i1, Iterable<Tuple3<String, Integer, String>> i2, Collector<String> collector) throws Exception {// 凭借这两个迭代器实现内连接,左右连接// 内连接for (Tuple3<String, Integer, String> t1 : i1) {boolean noEelement = true;for (Tuple3<String, Integer, String> t2 : i2) {noEelement = false;collector.collect("key="+t1.f0+",t1.value="+t1.f1+",t2.value="+t2.f1);}if(noEelement){collector.collect("key="+t1.f0+",t1.value="+t1.f1+",t2.value="+null);}}}}).print();//5. execute-执行env.execute();}}
如上代码所示,我们实现了 CoGroupFunction 接口,重写 coGroup 方法。一个流中有相同 Key 并且位于同一窗口的元素都会保存在同一个迭代器(Iterable),本示例中绿色流为 green Iterable,橘色流为 orange Iterable,如果要实现 LeftJoin ,需要保证 orange Iterable 中没有元素,green Iterable 中的元素也能输出。因此我们定义了一个 noElements 变量来判断 orange Iterable 是否有元素,如果 orange Iterable 中没有元素,单独输出 greenIterable 中的元素即可。Join 效果如下所示:
2.3 RightJoin
下面我们看一下如何使用 CoGroup 实现右连接:
如上图所示,我们定义了一个大小为 2 秒的滚动窗口。LeftJoin 只要橘色流窗口中有元素时,就会输出。即使在绿色流对应窗口中没有相对应的元素。
我们以滚动窗口为例来实现 RightJoin
// Join流
CoGroupedStreams coGroupStream = greenStream.coGroup(orangeStream);
DataStream<String> result = coGroupStream
// 绿色流
.where(new KeySelector<Tuple3<String, String, String>, String>() {@Overridepublic String getKey(Tuple3<String, String, String> tuple3) throws Exception {return tuple3.f0;}
})
// 橘色流
.equalTo(new KeySelector<Tuple3<String, String, String>, String>() {@Overridepublic String getKey(Tuple3<String, String, String> tuple3) throws Exception {return tuple3.f0;}
})
// 滚动窗口
.window(TumblingEventTimeWindows.of(Time.seconds(2)))
.apply(new RightJoinFunction());// 右连接
private static class RightJoinFunction implements CoGroupFunction<Tuple3<String, String, String>, Tuple3<String, String, String>, String> {@Overridepublic void coGroup(Iterable<Tuple3<String, String, String>> greenIterable, Iterable<Tuple3<String, String, String>> orangeIterable, Collector<String> collector) throws Exception {for (Tuple3<String, String, String> orangeTuple : orangeIterable) {boolean noElements = true;for (Tuple3<String, String, String> greenTuple : greenIterable) {noElements = false;LOG.info("[Join流] Key : {}, Value: {}, EventTime: {}",greenTuple.f0, greenTuple.f1 + ", " + orangeTuple.f1, greenTuple.f2 + ", " + orangeTuple.f2);collector.collect(greenTuple.f1 + ", " + orangeTuple.f1);}if (noElements) {LOG.info("[Join流] Key : {}, Value: {}, EventTime: {}",orangeTuple.f0, "null, " + orangeTuple.f1, "null, " + orangeTuple.f2);collector.collect("null, " + orangeTuple.f2);}}}
}
如上代码所示,我们实现了 CoGroupFunction 接口,重写 coGroup 方法。一个流中有相同 Key 并且位于同一窗口的元素都会保存在同一个迭代器(Iterable),本示例中绿色流为 greenIterable,橘色流为 orangeIterable,如果要实现 RightJoin,实现原理跟 LeftJoin 一样,需要保证 greenIterable 中没有元素,orangeIterable 中的元素也能输出。因此我们定义了一个 noElements 变量来判断 greenIterable 是否有元素,如果 greenIterable 中没有元素,单独输出 orangeIterable 中的元素即可。
3. Interval Join
Interval Join 不同于 Join以及CoGroup 原因是 Join和CoGroup 他们是窗口Join ,必须给定窗口的 ,Interval Join不需要给窗口。Interval Join 必须先分组才能使用。
Flink 中基于 DataStream 的 Join,只能实现在同一个窗口的两个数据流进行 Join,但是在实际中常常会存在数据乱序或者延时的情况,导致两个流的数据进度不一致,就会出现数据跨窗口的情况,那么数据就无法在同一个窗口内 Join。Flink 基于 KeyedStream 提供的 Interval Join 机制可以对两个keyedStream 进行 Join, 按照相同的 key 在一个相对数据时间的时间段内进行 Join。按照指定字段以及右流相对左流偏移的时间区间进行关联:
b.timestamp ∈ [a.timestamp + lowerBound, a.timestamp + upperBound]
或者
a.timestamp + lowerBound <= b.timestamp <= a.timestamp + upperBound
其中a和b分别是上图中绿色流和橘色流中的元素,并且有相同的 key。只需要保证 lowerBound 永远小于等于 upperBound 即可,均可以为正数或者负数。
从上面可以看出绿色流可以晚到 lowerBound(lowerBound为负的话)时间,也可以早到 upperBound(upperBound为正的话)时间。也可以理解为橘色流中的每个元素可以和绿色流中指定区间的元素进行 Join。需要注意的是 Interval Join 当前仅支持事件时间(EventTime):
public IntervalJoined<T1, T2, KEY> between(Time lowerBound, Time upperBound) {if (timeBehaviour != TimeBehaviour.EventTime) {throw new UnsupportedTimeCharacteristicException("Time-bounded stream joins are only supported in event time");}
}
下面我们具体看看如何实现一个 Interval Join:
package com.bigdata.day07;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.Arrays;
import java.util.Date;/*** @基本功能:* @program:FlinkDemo* @author: 闫哥* @create:2023-11-27 09:31:57**/
public class _ShuangLiuIntervalJoinDemo {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 并行度不为1 ,效果很难出来,因为本地的并行度是16,只有16个并行度都触发才能看到效果env.setParallelism(1);//2. source-加载数据 key,0,2021-03-26 12:09:00DataStream<Tuple3<String, Integer, String>> greenStream = env.socketTextStream("localhost", 8888).map(new MapFunction<String, Tuple3<String, Integer, String>>() {@Overridepublic Tuple3<String, Integer, String> map(String line) throws Exception {String[] arr = line.split(",");System.out.println("绿色:"+ Arrays.toString(arr));return Tuple3.of(arr[0], Integer.valueOf(arr[1]), arr[2]);}})// 因为用到了EventTime 所以势必用到水印,否则报错.assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, Integer, String>>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Integer, String>>() {@Overridepublic long extractTimestamp(Tuple3<String, Integer, String> element, long recordTimestamp) {Long timeStamp = 0L;SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");Date date = null;try {date = simpleDateFormat.parse(element.f2);} catch (ParseException e) {throw new RuntimeException(e);}timeStamp = date.getTime();System.out.println("绿色的时间:"+timeStamp);System.out.println(element.f0);return timeStamp;}}));;// 以后这个9999少用,因为kafka占用这个端口 key,0,2021-03-26 12:09:00DataStream<Tuple3<String, Integer, String>> orangeStream = env.socketTextStream("localhost", 7777).map(new MapFunction<String, Tuple3<String,Integer,String>>() {@Overridepublic Tuple3<String, Integer, String> map(String line) throws Exception {String[] arr = line.split(",");System.out.println("橘色:"+ Arrays.toString(arr));return Tuple3.of(arr[0],Integer.valueOf(arr[1]),arr[2]);}})// 因为用到了EventTime 所以势必用到水印,否则报错.assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, Integer, String>>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Integer, String>>() {@Overridepublic long extractTimestamp(Tuple3<String, Integer, String> element, long recordTimestamp) {Long timeStamp = 0L;SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");Date date = null;try {date = simpleDateFormat.parse(element.f2);} catch (ParseException e) {throw new RuntimeException(e);}timeStamp = date.getTime();System.out.println("橘色的时间:"+timeStamp);return timeStamp;}}));//3. transformation-数据处理转换DataStream resultStream = greenStream.keyBy(tup -> tup.f0).intervalJoin(orangeStream.keyBy(tup -> tup.f0)).between(Time.seconds(-2),Time.seconds(1)).process(new ProcessJoinFunction<Tuple3<String, Integer, String>, Tuple3<String, Integer, String>, String>() {@Overridepublic void processElement(Tuple3<String, Integer, String> left, Tuple3<String, Integer, String> right, ProcessJoinFunction<Tuple3<String, Integer, String>, Tuple3<String, Integer, String>, String>.Context ctx, Collector<String> out) throws Exception {out.collect("left中的key:"+left.f0+",value="+left.f1+",time="+left.f2+",right中的key:"+right.f0+",value="+right.f1+",time="+right.f2);}});//4. sink-数据输出resultStream.print();//5. execute-执行env.execute();}
}
需要注意的是 Interval Join 当前仅支持事件时间(EventTime),所以需要为流指定事件时间戳(毫秒值)。
两条流输入元素如下所示:
绿色流:
c,0,2021-03-23 12:09:00
c,1,2021-03-23 12:09:01
c,6,2021-03-23 12:09:06
c,7,2021-03-23 12:09:07橘色流:
c,0,2021-03-23 12:09:00
c,2,2021-03-23 12:09:02
c,3,2021-03-23 12:09:03
c,4,2021-03-23 12:09:04
c,5,2021-03-23 12:09:05
c,7,2021-03-23 12:09:07
总结:
join、coGroup 都是基于窗口的join, join算子本身只支持内连接
coGroup 只可以实现内连接,左右连接
intervalJoin 是一个范围join, 一个数据流上的某一时刻数据可以join 另外一个数据流上的某一范围的数据。跟窗口无关。
相关文章:
Flink学习连载文章11--双流Join
双流 Join 和两个流合并是不一样的 两个流合并:两个流变为 1 个流 union connect 双流 join: 两个流 join,其实这两个流还是原来的,只是满足条件的数据会变为一个新的流。 可以结合 sql 语句中的 union 和 join 的区别。 在离线 Hive 中&…...
RayLink远程控制技术助力教育领域的创新应用研究
远程控制技术在教育领域的应用确实改变了传统教学模式。想象一下,无论身处何地,只要有网络连接,就能通过远程软件加入课堂,这种体验是不是很吸引人?虽然远程控制听起来很技术化,但别担心,小编今…...
【Qt移植LVGL】QWidget手搓LVGL软件仿真模拟器(非直接运行图形库)
【Qt移植LVGL】QWidget手搓LVGL软件仿真模拟器(非直接运行图形库) 打包开源地址: Qt函数库gitee地址 更新以gitee为准 移植后的demo工程: gitee 有些没实现的 后续我会继续优化 文章目录 别碰瓷看清楚:是移植&#…...
PostgreSQL UNION 操作符
PostgreSQL UNION 操作符 引言 PostgreSQL 是一种功能强大的开源对象关系型数据库管理系统,它以其稳定性、可靠性和先进的特性而闻名。在处理数据库查询时,我们经常需要合并来自不同表的数据,或者合并同一表的不同查询结果。这时,UNION 操作符就变得非常有用。本文将详细…...
spring boot 测试 mybatis mapper类
spring boot 测试 mybatis mapper类 针对 mybatis plus不启动 webserver指定加载 xml 【过滤 “classpath*:/mapper/**/*.xml” 下的xml】, mapper xml文件名和mapper java文件名称要一样,是根据文件名称过滤的。默认情况加载和解析所有mapper.xml 自定义 MapperT…...
Python发kafka消息
要在Python中向Kafka发送消息,你可以使用kafka-python库。首先,你需要安装这个库。如果你还没有安装它,可以通过pip来安装: bash pip install kafka-python 接下来是一个简单的例子,展示如何创建一个生产者࿰…...
zipkin 引申一:如何遍历jar目录下的exec.jar并选择对应序号的jar启动
文章目录 一、Zipin概述二、如何下载三、需求描述四、代码实现1. pom设置2. 相关工具类3. 核心代码 五、打包部署1. 打包,在项目目录执行mvn clean package2. 部署3. 运行以及停止 六、源码放送 一、Zipin概述 Zipkin是Twitter开源的分布式跟踪系统,基于…...
使用 httputils + protostuff 实现高性能 rpc
1、先讲讲 protobuf protobuf 一直是高性能序列化的代表之一(google 出品)。但是用起来,可难受了,你得先申明 “.proto” 配置文件,并且要把这个配置文件编译转成类。所以必然要学习新语法、新工具。 可能真的太难受…...
Facebook广告文案流量秘诀
Facebook 广告文案是制作有效 Facebook 广告的关键方面。它侧重于伴随广告视觉元素的文本内容。今天我们的博客将深入探讨成功的 Facebook 广告文案的秘密! 一、广告文案怎么写? 正文:这是帖子的正文,出现在您姓名的正下方。它可…...
在Vue.js中生成二维码(将指定的url+参数 生成二维码)
在Vue.js中生成二维码,你可以使用JavaScript库如qrcode或qr.js。以下是一个简单的例子,展示如何在Vue组件中使用qrcode库将指定的URL加上参数生成二维码。 首先,你需要安装qrcode库。如果你使用的是npm或yarn,可以通过命令行安装…...
Face2QR:可根据人脸图像生成二维码,还可以扫描,以后个人名片就这样用了!
今天给大家介绍的是一种专为生成个性化二维码而设计的新方法Face2QR,可以将美观、人脸识别和可扫描性完美地融合在一起。 下图展示为Face2QR 生成的面部图像(第一行)和二维码图像(第二行)。生成的二维码不仅忠实地保留…...
【Linux网络编程】第六弹---构建TCP服务器:从基础到线程池版本的实现与测试详解
✨个人主页: 熬夜学编程的小林 💗系列专栏: 【C语言详解】 【数据结构详解】【C详解】【Linux系统编程】【Linux网络编程】 目录 1、TcpServerMain.cc 2、TcpServer.hpp 2.1、TcpServer类基本结构 2.2、构造析构函数 2.3、InitServer(…...
XML 语言随笔
XML的含义 XML(eXtensible Markup Language,可扩展标记语言)是一种用于存储和传输数据的标记语言。XML与HTML(HyperText Markup Language,超文本标记语言)类似,但XML的设计目的是描述数据&…...
flex: 1 display:flex 导致的宽度失效问题
flex: 1 & display:flex 导致的宽度失效问题 问题复现 有这样的一个业务场景,详情项每行三项分别占33%宽度,每项有label字数不固定所以宽度不固定,还有content 占满标签剩余宽度,文字过多显示省略号, 鼠标划入展示…...
65页PDF | 企业IT信息化战略规划(限免下载)
一、前言 这份报告是企业IT信息化战略规划,报告详细阐述了企业在面对新兴技术成熟和行业竞争加剧的背景下,如何通过三个阶段的IT战略规划(IT 1.0基础建设、IT 2.0运营效率、IT 3.0持续发展),系统地构建IT管理架构、应…...
15.三数之和
给你一个整数数组 nums ,判断是否存在三元组 [nums[i], nums[j], nums[k]] 满足 i ! j、i ! k 且 j ! k ,同时还满足 nums[i] nums[j] nums[k] 0 。请你返回所有和为 0 且不重复的三元组。 注意:答案中不可以包含重复的三元组。 示例 1&am…...
【Notepad++】---设置背景为护眼色(豆沙绿)最新最详细
在编程的艺术世界里,代码和灵感需要寻找到最佳的交融点,才能打造出令人为之惊叹的作品。而在这座秋知叶i博客的殿堂里,我们将共同追寻这种完美结合,为未来的世界留下属于我们的独特印记。 【Notepad】---设置背景为护眼色…...
项目代码第2讲:从0实现LoginController.cs,UsersController.cs、User相关的后端接口对应的前端界面
一、User 1、使用数据注解设置主键和外键 设置主键:在User类的U_uid属性上使用[Key]注解。 设置外键:在Order类中,创建一个表示外键的属性(例如UserU_uid),并使用[ForeignKey]注解指定它引用User类的哪个…...
电子商务人工智能指南 3/6 - 聊天机器人和客户服务
介绍 81% 的零售业高管表示, AI 至少在其组织中发挥了中等至完全的作用。然而,78% 的受访零售业高管表示,很难跟上不断发展的 AI 格局。 近年来,电子商务团队加快了适应新客户偏好和创造卓越数字购物体验的需求。采用 AI 不再是一…...
vue.js组件开发的所有流程
1. 设计组件架构 首先,你需要考虑应用的结构,决定哪些部分应该成为独立的组件。这包括: 提取重复的UI元素:如按钮、输入框、卡片等。 功能模块:例如登录框、导航栏、数据表格等。 2. 设置开发环境 安装Node.js…...
从零开始学TiDB(1) 核心组件架构概述
首先TiDB深度兼容MySQL 5.7 1. TiDB Server SQL语句的解析与编译:首先一条SQL语句最先到达的地方是TiDB Server集群,TiDB Server是无状态的,不存储数据,SQL 发过来之后TiDB Server 负责 解析,优化,编译 这…...
VsCode运行Ts文件
1. 生成package.json文件 npm init 2. 生成tsconfig.json文件 tsc --init 3. Vscode运行ts文件 在ts文件点击右键执行Run Code,执行ts文件...
初始化webpack应用示例
1、初始化npm mkdir webpack_test cd webpack_test npm init 2、安装webpack依赖 npm install webpack webpack-cli -D 3、添加文件夹,入口文件 mkdir src touch index.js touch add-content.js 文件夹结构 index.js import addContent from "./add-cont…...
liunx docker 部署 nacos seata sentinel
部署nacos 1.按要求创建好数据库 2.创建docker 容器 docker run -d --name nacos-server -p 8848:8848 -e MODEstandalone -e SPRING_DATASOURCE_PLATFORMmysql -e MYSQL_SERVICE_HOST172.17.251.166 -e MYSQL_SERVICE_DB_NAMEry-config -e MYSQL_SERVICE_PORT3306 -e MYSQL…...
MySQL面试
文章目录 事务隔离级别需要解决的问题事务隔离级别 MySQL 中是如何实现事务隔离的实现可重复读 什么是存储引擎如何定位慢查询分析慢查询原因MySQL超大分页怎么处理索引失效什么时候建立唯一索引、前缀索引、联合索引?redolog与binlog是如何保证一致的redolog刷盘时…...
linux运维之shell编程
Shell 编程在系统运维中及其重要 1. Shell 编程概述 Shell 是一种命令行解释器,能够执行操作系统的命令。Shell 脚本是一个包含一系列 Shell 命令的文件,它可以被执行,以自动化和批量处理任务。常用的 Shell 类型包括 bash、sh、zsh 等。Shel…...
ssm 多数据源 注解版本
application.xml 配置如下 <!-- 使用 DruidDataSource 数据源 --><bean id"primaryDataSource" class"com.alibaba.druid.pool.DruidDataSource" init-method"init" destroy-method"close"></bean> <!-- 使用 数…...
Nginx核心配置详解
一、配置文件说明 nginx官方帮助文档:nginx documentation nginx的配置文件的组成部分: 主配置文件:nginx.conf子配置文件: include conf.d/*.conffastcgi, uwsgi,scgi 等协议相关的配置文件mime.types:…...
十六(AJAX3)、XMLHttpRequest、Promise、简易axios封装、案例天气预报、lodash-debounce防抖
1. XMLHttpRequest 1.1 XMLHttpRequest-基本使用 /* 定义:XMLHttpRequest(XHR)对象用于与服务器交互。通过 XMLHttpRequest 可以在不刷新页面的情况下请求特定 URL,获取数据。这允许网页在不影响用户操作的情况下,更…...
12.06 深度学习-预训练
# 使用更深的神经网络 经典神经网络 import torch import cv2 from torchvision.models import resnet18,ResNet18_Weights from torch import optim,nn from torch.utils.data import DataLoader from torchvision.datasets import CIFAR10 from torchvision import tr…...
【计算机网络】期末速成(2)
部分内容来源于网络,侵删~ 第五章 传输层 概述 传输层提供进程和进程之间的逻辑通信,靠**套接字Socket(主机IP地址,端口号)**找到应用进程。 传输层会对收到的报文进行差错检测。 比特流(物理层)-> 数据帧(数据链路层) -> 分组 / I…...
Python学习笔记10-作用域
作用域 定义:Python程序程序可以直接访问命名空间的正文区域 作用:决定了哪一部分区域可以访问哪个特定的名称 分类: 局部作用域(Local)闭包函数外的函数中(Enclosing)全局作用域࿰…...
Sui 主网升级至 V1.38.3
Sui 主网现已升级至 V1.38.3 版本,同时协议升级至 69 版本。请开发者及时关注并调整! 其他升级要点如下所示: 协议 #20199 在共识快速路径投票中设置允许的轮次数量。 节点(验证节点与全节点) #20238 为验证节点…...
linux的vdagent框架设计
1、vdagent Linux 的 spice 客户代理由两部分组成,一个系统范围的守护进程 spice-vdagentd 和一个 X11 会话代理 spice-vdagent,每个 X11 会话有一个。spice-vdagentd 通过 Sys-V initscript 或 systemd 单元启动。 如下图:spice-vdagent&a…...
vue3+elementPlus封装的一体表格
目录结构 源码 exportOptions.js export default reactive([{label: 导出本页,key: 1,},{label: 导出全部,key: 2,}, ])index.vue <template><div class"flex flex-justify-between flex-items-end"><div><el-button-group><slot name…...
判断是否 AGP7+ 的方法
如何判断? /*** 是否是AGP7.0.0及以上* param project* return*/static boolean isAGP7_0_0(Project project) {def androidComponents project.extensions.findByName("androidComponents")if (androidComponents && androidComponents.hasProp…...
使用 Streamlit +gpt-4o实现有界面的图片内容分析
在上一篇利用gpt-4o分析图像的基础上,进一步将基于 Python 的 Streamlit 库,结合 OpenAI 的 API,构建一个简洁易用的有界面图片内容分析应用。通过该应用,用户可以轻松浏览本地图片,并获取图片的详细描述。 调用gpt-4o…...
树莓集团是如何链接政、产、企、校四个板块的?
树莓集团作为数字影像行业的积极探索者与推动者,我们通过多维度、深层次的战略举措,将政、产、企、校四个关键板块紧密链接在一起,实现了资源的高效整合与协同发展,共同为数字影像产业的繁荣贡献力量。 与政府的深度合作政府在产业…...
Fyne ( go跨平台GUI )中文文档-Fyne总览(二)
本文档注意参考官网(developer.fyne.io/) 编写, 只保留基本用法 go代码展示为Go 1.16及更高版本,ide为goland2021.2??????? ?这是一个系列文章: Fyne ( go跨平台GUI )中文文档-入门(一)-CSDN博客 Fyne ( go跨平台GUI )中文文档-Fyne总览(二)-CSDN博客 Fyne…...
MySQL数据库(3)-SQL基础语言学习
1. DDL数据定义语言 1.1 什么是DDL DDL(Data Definition Language,数据定义语言)是SQL语言的一部分,用于定义和修改数据库结构。DDL主要包括以下三类语句: 1.CREATE:用于创建数据库对象,如数…...
下拉框根据sql数据回显
vue <a-form-item label"XXXX" :labelCol"labelCol" :wrapperCol"wrapperCol" required><a-select v-decorator"[disputeType, validatorRules.disputeType]" style"width: 200px" placeholder"请选择XXXX&q…...
电池销售系统
文末获取源码和万字论文,制作不易,感谢点赞支持。 摘 要 在当今信息爆炸的大时代,由于信息管理系统能够更有效便捷的完成信息的管理,越来越多的人及机构都已经引入和发展以信息管理系统为基础的信息化管理模式,随之信…...
四、镜像构建
四、镜像构建 从镜像大小上来说,一个比较小的镜像只有十几MB,而内核文件需要一百多MB,因此镜像里面是没有内核的,镜像是在被启动为容器后直接使用宿主机的内核,而镜像本身则只提供相应的rootfs,即系统正常…...
FastAPI 响应状态码:管理和自定义 HTTP Status Code
FastAPI 响应状态码:管理和自定义 HTTP Status Code 本文介绍了如何在 FastAPI 中声明、使用和修改 HTTP 状态码,涵盖了常见的 HTTP 状态码分类,如信息响应(1xx)、成功状态(2xx)、客户端错误&a…...
C#设计模式--原型模式(Prototype Pattern)
原型模式是一种创建型设计模式,它允许通过复制现有对象来创建新对象,而无需通过构造函数。这种方式可以提高性能,特别是在创建复杂对象时。C# 中可以通过实现 ICloneable 接口或自定义克隆方法来实现原型模式。 案例 1:文档编辑器…...
使用Goland对6.5840项目进行go build出现异常
使用Goland对6.5840项目进行go build出现异常 Lab地址: https://pdos.csail.mit.edu/6.824/labs/lab-mr.html项目地址: git://g.csail.mit.edu/6.5840-golabs-2024 6.5840运行环境: mac系统 goland git clone git://g.csail.mit.edu/6.5840-golabs-2024 6.5840 cd 6.5840/src…...
使用kibana实现es索引的数据映射和索引模版/组件模版
1 数据映射 数据映射官方链接 1.1 日期映射案例 1.创建一条索引。把索引中的字段生日映射为日期,并制定映射后的格式为年月日 PUT http://10.0.0.91:9200/zhiyong18-luckyboy-date {"mappings": {"properties": {"birthday": {&q…...
基于elementui的远程搜索下拉选择分页组件
在开发一个练手项目的时候,需要一个远程搜索的下拉选择组件; elementui自带的el-select支持远程搜索;但如果一次性查询的数据过多;会导致卡顿。故自己实现一个可分页的远程下拉选择组件 效果: 代码: <…...
每日一题 LCR 074. 合并区间
LCR 074. 合并区间 对遍历顺序注意一下就可以 class Solution { public:vector<vector<int>> merge(vector<vector<int>>& intervals) {int n intervals.size();sort(intervals.begin(),intervals.end());int idx 0;vector<vector<int&g…...
Flink SQL
Overview | Apache Flink FlinkSQL开发步骤 Concepts & Common API | Apache Flink 添加依赖: <dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_2.11</artifactId><version>…...