【Flink系列】6. Flink中的时间和窗口
6. Flink中的时间和窗口
在批处理统计中,我们可以等待一批数据都到齐后,统一处理。但是在实时处理统计中,我们是来一条就得处理一条,那么我们怎么统计最近一段时间内的数据呢?引入“窗口”。
所谓的“窗口”,一般就是划定的一段时间范围,也就是“时间窗”;对在这范围内的数据进行处理,就是所谓的窗口计算。所以窗口和时间往往是分不开的。接下来我们就深入了解一下Flink中的时间语义和窗口的应用。
6.1 窗口(Window)
6.1.1 窗口的概念
Flink是一种流式计算引擎,主要是来处理无界数据流的,数据源源不断、无穷无尽。想要更加方便高效地处理无界流,一种方式就是将无限数据切割成有限的“数据块”进行处理,这就是所谓的“窗口”(Window)。
注意:Flink中窗口并不是静态准备好的,而是动态创建——当有落在这个窗口区间范围的数据达到时,才创建对应的窗口。另外,这里我们认为到达窗口结束时间时,窗口就触发计算并关闭,事实上“触发计算”和“窗口关闭”两个行为也可以分开,这部分内容我们会在后面详述。
6.1.2 窗口的分类
我们在上一节举的例子,其实是最为简单的一种时间窗口。在Flink中,窗口的应用非常灵活,我们可以使用各种不同类型的窗口来实现需求。接下来我们就从不同的角度,对Flink中内置的窗口做一个分类说明。
1)按照驱动类型分
2)按照窗口分配数据的规则分类
根据分配数据的规则,窗口的具体实现可以分为4类:滚动窗口(Tumbling Window)、滑动窗口(Sliding Window)、会话窗口(Session Window),以及全局窗口(Global Window)。
6.1.3 窗口API概览
1)按键分区(Keyed)和非按键分区(Non-Keyed)
在定义窗口操作之前,首先需要确定,到底是基于按键分区(Keyed)的数据流KeyedStream来开窗,还是直接在没有按键分区的DataStream上开窗。也就是说,在调用窗口算子之前,是否有keyBy操作。
(1)按键分区窗口(Keyed Windows)
经过按键分区keyBy操作后,数据流会按照key被分为多条逻辑流(logical streams),这就是KeyedStream。基于KeyedStream进行窗口操作时,窗口计算会在多个并行子任务上同时执行。相同key的数据会被发送到同一个并行子任务,而窗口操作会基于每个key进行单独的处理。所以可以认为,每个key上都定义了一组窗口,各自独立地进行统计计算。
在代码实现上,我们需要先对DataStream调用.keyBy()进行按键分区,然后再调用.window()定义窗口。
stream.keyBy(...).window(...)
(2)非按键分区(Non-Keyed Windows)
如果没有进行keyBy,那么原始的DataStream就不会分成多条逻辑流。这时窗口逻辑只能在一个任务(task)上执行,就相当于并行度变成了1。
在代码中,直接基于DataStream调用.windowAll()定义窗口。
stream.windowAll(...)
注意:对于非按键分区的窗口操作,手动调大窗口算子的并行度也是无效的,windowAll本身就是一个非并行的操作。
2)代码中窗口API的调用
窗口操作主要有两个部分:窗口分配器(Window Assigners)和窗口函数(Window Functions)。
stream.keyBy(<key selector>).window(<window assigner>).aggregate(<window function>)
其中.window()方法需要传入一个窗口分配器,它指明了窗口的类型;而后面的.aggregate()方法传入一个窗口函数作为参数,它用来定义窗口具体的处理逻辑。窗口分配器有各种形式,而窗口函数的调用方法也不只.aggregate()一种,我们接下来就详细展开讲解。
6.1.4 窗口分配器
定义窗口分配器(Window Assigners)是构建窗口算子的第一步,它的作用就是定义数据应该被“分配”到哪个窗口。所以可以说,窗口分配器其实就是在指定窗口的类型。
窗口分配器最通用的定义方式,就是调用.window()方法。这个方法需要传入一个WindowAssigner作为参数,返回WindowedStream。如果是非按键分区窗口,那么直接调用.windowAll()方法,同样传入一个WindowAssigner,返回的是AllWindowedStream。
窗口按照驱动类型可以分成时间窗口和计数窗口,而按照具体的分配规则,又有滚动窗口、滑动窗口、会话窗口、全局窗口四种。除去需要自定义的全局窗口外,其他常用的类型Flink中都给出了内置的分配器实现,我们可以方便地调用实现各种需求。
6.1.4.1 时间窗口
时间窗口是最常用的窗口类型,又可以细分为滚动、滑动和会话三种。
(1)滚动处理时间窗口
窗口分配器由类TumblingProcessingTimeWindows提供,需要调用它的静态方法.of()。
stream.keyBy(...).window(TumblingProcessingTimeWindows.of(Time.seconds(5))).aggregate(...)
这里.of()方法需要传入一个Time类型的参数size,表示滚动窗口的大小,我们这里创建了一个长度为5秒的滚动窗口。
另外,.of()还有一个重载方法,可以传入两个Time类型的参数:size和offset。第一个参数当然还是窗口大小,第二个参数则表示窗口起始点的偏移量。
(2)滑动处理时间窗口
窗口分配器由类SlidingProcessingTimeWindows提供,同样需要调用它的静态方法.of()。
stream.keyBy(...).window(SlidingProcessingTimeWindows.of(Time.seconds(10),Time.seconds(5))).aggregate(...)
这里.of()方法需要传入两个Time类型的参数:size和slide,前者表示滑动窗口的大小,后者表示滑动窗口的滑动步长。我们这里创建了一个长度为10秒、滑动步长为5秒的滑动窗口。
滑动窗口同样可以追加第三个参数,用于指定窗口起始点的偏移量,用法与滚动窗口完全一致。
(3)处理时间会话窗口
窗口分配器由类ProcessingTimeSessionWindows提供,需要调用它的静态方法.withGap()或者.withDynamicGap()。
stream.keyBy(...).window(ProcessingTimeSessionWindows.withGap(Time.seconds(10))).aggregate(...)
这里.withGap()方法需要传入一个Time类型的参数size,表示会话的超时时间,也就是最小间隔session gap。我们这里创建了静态会话超时时间为10秒的会话窗口。
另外,还可以调用withDynamicGap()方法定义session gap的动态提取逻辑。
(4)滚动事件时间窗口
窗口分配器由类TumblingEventTimeWindows提供,用法与滚动处理事件窗口完全一致。
stream.keyBy(...).window(TumblingEventTimeWindows.of(Time.seconds(5))).aggregate(...)
(5)滑动事件时间窗口
窗口分配器由类SlidingEventTimeWindows提供,用法与滑动处理事件窗口完全一致。
stream.keyBy(...).window(SlidingEventTimeWindows.of(Time.seconds(10),Time.seconds(5))).aggregate(...)
(6)事件时间会话窗口
窗口分配器由类EventTimeSessionWindows提供,用法与处理事件会话窗口完全一致。
stream.keyBy(...).window(EventTimeSessionWindows.withGap(Time.seconds(10))).aggregate(...)
6.1.4.2 计数窗口
计数窗口概念非常简单,本身底层是基于全局窗口(Global Window)实现的。Flink为我们提供了非常方便的接口:直接调用.countWindow()方法。根据分配规则的不同,又可以分为滚动计数窗口和滑动计数窗口两类,下面我们就来看它们的具体实现。
(1)滚动计数窗口
滚动计数窗口只需要传入一个长整型的参数size,表示窗口的大小。
stream.keyBy(...).countWindow(10)
我们定义了一个长度为10的滚动计数窗口,当窗口中元素数量达到10的时候,就会触发计算执行并关闭窗口。
(2)滑动计数窗口
与滚动计数窗口类似,不过需要在.countWindow()调用时传入两个参数:size和slide,前者表示窗口大小,后者表示滑动步长。
stream.keyBy(...).countWindow(10,3)
我们定义了一个长度为10、滑动步长为3的滑动计数窗口。每个窗口统计10个数据,每隔3个数据就统计输出一次结果。
(3)全局窗口
全局窗口是计数窗口的底层实现,一般在需要自定义窗口时使用。它的定义同样是直接调用.window(),分配器由GlobalWindows类提供。
stream.keyBy(...).window(GlobalWindows.create());
需要注意使用全局窗口,必须自行定义触发器才能实现窗口计算,否则起不到任何作用。
6.1.5 窗口函数
窗口函数定义了要对窗口中收集的数据做的计算操作,根据处理的方式可以分为两类:增量聚合函数和全窗口函数。下面我们来进行分别讲解。
6.1.5.1 增量聚合函数(ReduceFunction / AggregateFunction)
窗口将数据收集起来,最基本的处理操作当然就是进行聚合。我们可以每来一个数据就在之前结果上聚合一次,这就是“增量聚合”。
典型的增量聚合函数有两个:ReduceFunction和AggregateFunction。
1)归约函数(ReduceFunction)
代码示例:
public class WindowReduceDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.socketTextStream("hadoop102", 7777).map(new WaterSensorMapFunction()).keyBy(r -> r.getId())// 设置滚动事件时间窗口.window(TumblingProcessingTimeWindows.of(Time.seconds(10))).reduce(new ReduceFunction<WaterSensor>() {@Overridepublic WaterSensor reduce(WaterSensor value1, WaterSensor value2) throws Exception {System.out.println("调用reduce方法,之前的结果:"+value1 + ",现在来的数据:"+value2);return new WaterSensor(value1.getId(), System.currentTimeMillis(),value1.getVc()+value2.getVc());}}).print();env.execute();}
}
2)聚合函数(AggregateFunction)
ReduceFunction可以解决大多数归约聚合的问题,但是这个接口有一个限制,就是聚合状态的类型、输出结果的类型都必须和输入数据类型一样。
Flink Window API中的aggregate就突破了这个限制,可以定义更加灵活的窗口聚合操作。这个方法需要传入一个AggregateFunction的实现类作为参数。
AggregateFunction可以看作是ReduceFunction的通用版本,这里有三种类型:输入类型(IN)、累加器类型(ACC)和输出类型(OUT)。输入类型IN就是输入流中元素的数据类型;累加器类型ACC则是我们进行聚合的中间状态类型;而输出类型当然就是最终计算结果的类型了。
接口中有四个方法:
- createAccumulator():创建一个累加器,这就是为聚合创建了一个初始状态,每个聚合任务只会调用一次。
- add():将输入的元素添加到累加器中。
- getResult():从累加器中提取聚合的输出结果。
- merge():合并两个累加器,并将合并后的状态作为一个累加器返回。
所以可以看到,AggregateFunction的工作原理是:首先调用createAccumulator()为任务初始化一个状态(累加器);而后每来一个数据就调用一次add()方法,对数据进行聚合,得到的结果保存在状态中;等到了窗口需要输出时,再调用getResult()方法得到计算结果。很明显,与ReduceFunction相同,AggregateFunction也是增量式的聚合;而由于输入、中间状态、输出的类型可以不同,使得应用更加灵活方便。
代码实现如下:
public class WindowAggregateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("hadoop102", 7777).map(new WaterSensorMapFunction());KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(sensor -> sensor.getId());// 1. 窗口分配器WindowedStream<WaterSensor, String, TimeWindow> sensorWS = sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));SingleOutputStreamOperator<String> aggregate = sensorWS.aggregate(new AggregateFunction<WaterSensor, Integer, String>() {@Overridepublic Integer createAccumulator() {System.out.println("创建累加器");return 0;}@Overridepublic Integer add(WaterSensor value, Integer accumulator) {System.out.println("调用add方法,value="+value);return accumulator + value.getVc();}@Overridepublic String getResult(Integer accumulator) {System.out.println("调用getResult方法");return accumulator.toString();}@Overridepublic Integer merge(Integer a, Integer b) {System.out.println("调用merge方法");return null;}});aggregate.print();env.execute();}
}
另外,Flink也为窗口的聚合提供了一系列预定义的简单聚合方法,可以直接基于WindowedStream调用。主要包括.sum()/max()/maxBy()/min()/minBy(),与KeyedStream的简单聚合非常相似。它们的底层,其实都是通过AggregateFunction来实现的。
6.1.5.2 全窗口函数(full window functions)
有些场景下,我们要做的计算必须基于全部的数据才有效,这时做增量聚合就没什么意义了;另外,输出的结果有可能要包含上下文中的一些信息(比如窗口的起始时间),这是增量聚合函数做不到的。
所以,我们还需要有更丰富的窗口计算方式。窗口操作中的另一大类就是全窗口函数。与增量聚合函数不同,全窗口函数需要先收集窗口中的数据,并在内部缓存起来,等到窗口要输出结果的时候再取出数据进行计算。
在Flink中,全窗口函数也有两种:WindowFunction和ProcessWindowFunction。
1)窗口函数(WindowFunction)
WindowFunction字面上就是“窗口函数”,它其实是老版本的通用窗口函数接口。我们可以基于WindowedStream调用.apply()方法,传入一个WindowFunction的实现类。
stream.keyBy(<key selector>).window(<window assigner>).apply(new MyWindowFunction());
这个类中可以获取到包含窗口所有数据的可迭代集合(Iterable),还可以拿到窗口(Window)本身的信息。
不过WindowFunction能提供的上下文信息较少,也没有更高级的功能。事实上,它的作用可以被ProcessWindowFunction全覆盖,所以之后可能会逐渐弃用。
2)处理窗口函数(ProcessWindowFunction)
ProcessWindowFunction是Window API中最底层的通用窗口函数接口。之所以说它“最底层”,是因为除了可以拿到窗口中的所有数据之外,ProcessWindowFunction还可以获取到一个“上下文对象”(Context)。这个上下文对象非常强大,不仅能够获取窗口信息,还可以访问当前的时间和状态信息。这里的时间就包括了处理时间(processing time)和事件时间水位线(event time watermark)。这就使得ProcessWindowFunction更加灵活、功能更加丰富,其实就是一个增强版的WindowFunction。
事实上,ProcessWindowFunction是Flink底层API——处理函数(process function)中的一员,关于处理函数我们会在后续章节展开讲解。
代码实现如下:
public class WindowProcessDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("hadoop102", 7777).map(new WaterSensorMapFunction());KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(sensor -> sensor.getId());// 1. 窗口分配器WindowedStream<WaterSensor, String, TimeWindow> sensorWS = sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));SingleOutputStreamOperator<String> process = sensorWS.process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {@Overridepublic void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {long count = elements.spliterator().estimateSize();long windowStartTs = context.window().getStart();long windowEndTs = context.window().getEnd();String windowStart = DateFormatUtils.format(windowStartTs, "yyyy-MM-dd HH:mm:ss.SSS");String windowEnd = DateFormatUtils.format(windowEndTs, "yyyy-MM-dd HH:mm:ss.SSS");out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements.toString());}});process.print();env.execute();}
}
6.1.5.3 增量聚合和全窗口函数的结合使用
在实际应用中,我们往往希望兼具这两者的优点,把它们结合在一起使用。Flink的Window API就给我们实现了这样的用法。
我们之前在调用WindowedStream的.reduce()和.aggregate()方法时,只是简单地直接传入了一个ReduceFunction或AggregateFunction进行增量聚合。除此之外,其实还可以传入第二个参数:一个全窗口函数,可以是WindowFunction或者ProcessWindowFunction。
// ReduceFunction与WindowFunction结合
public <R> SingleOutputStreamOperator<R> reduce(ReduceFunction<T> reduceFunction,WindowFunction<T,R,K,W> function) // ReduceFunction与ProcessWindowFunction结合
public <R> SingleOutputStreamOperator<R> reduce(ReduceFunction<T> reduceFunction,ProcessWindowFunction<T,R,K,W> function)// AggregateFunction与WindowFunction结合
public <ACC,V,R> SingleOutputStreamOperator<R> aggregate(AggregateFunction<T,ACC,V> aggFunction,WindowFunction<V,R,K,W> windowFunction)// AggregateFunction与ProcessWindowFunction结合
public <ACC,V,R> SingleOutputStreamOperator<R> aggregate(AggregateFunction<T,ACC,V> aggFunction,ProcessWindowFunction<V,R,K,W> windowFunction)
这样调用的处理机制是:基于第一个参数(增量聚合函数)来处理窗口数据,每来一个数据就做一次聚合;等到窗口需要触发计算时,则调用第二个参数(全窗口函数)的处理逻辑输出结果。需要注意的是,这里的全窗口函数就不再缓存所有数据了,而是直接将增量聚合函数的结果拿来当作了Iterable类型的输入。
具体实现代码如下:
public class WindowAggregateAndProcessDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("hadoop102", 7777).map(new WaterSensorMapFunction());KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(sensor -> sensor.getId());// 1. 窗口分配器WindowedStream<WaterSensor, String, TimeWindow> sensorWS = sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));// 2. 窗口函数:/*** 增量聚合 Aggregate + 全窗口 process* 1、增量聚合函数处理数据: 来一条计算一条* 2、窗口触发时, 增量聚合的结果(只有一条) 传递给 全窗口函数* 3、经过全窗口函数的处理包装后,输出** 结合两者的优点:* 1、增量聚合: 来一条计算一条,存储中间的计算结果,占用的空间少* 2、全窗口函数: 可以通过 上下文 实现灵活的功能*/// sensorWS.reduce() //也可以传两个SingleOutputStreamOperator<String> result = sensorWS.aggregate(new MyAgg(),new MyProcess());result.print();env.execute();}public static class MyAgg implements AggregateFunction<WaterSensor, Integer, String>{@Overridepublic Integer createAccumulator() {System.out.println("创建累加器");return 0;}@Overridepublic Integer add(WaterSensor value, Integer accumulator) {System.out.println("调用add方法,value="+value);return accumulator + value.getVc();}@Overridepublic String getResult(Integer accumulator) {System.out.println("调用getResult方法");return accumulator.toString();}@Overridepublic Integer merge(Integer a, Integer b) {System.out.println("调用merge方法");return null;}}// 全窗口函数的输入类型 = 增量聚合函数的输出类型public static class MyProcess extends ProcessWindowFunction<String,String,String,TimeWindow>{@Overridepublic void process(String s, Context context, Iterable<String> elements, Collector<String> out) throws Exception {long startTs = context.window().getStart();long endTs = context.window().getEnd();String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");long count = elements.spliterator().estimateSize();out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements.toString());}}
}
这里我们为了方便处理,单独定义了一个POJO类UrlViewCount来表示聚合输出结果的数据类型,包含了url、浏览量以及窗口的起始结束时间。用一个AggregateFunction来实现增量聚合,每来一个数据就计数加一;得到的结果交给ProcessWindowFunction,结合窗口信息包装成我们想要的UrlViewCount,最终输出统计结果。
6.1.6 其他API
对于一个窗口算子而言,窗口分配器和窗口函数是必不可少的。除此之外,Flink还提供了其他一些可选的API,让我们可以更加灵活地控制窗口行为。
6.1.6.1 触发器(Trigger)
触发器主要是用来控制窗口什么时候触发计算。所谓的“触发计算”,本质上就是执行窗口函数,所以可以认为是计算得到结果并输出的过程。
基于WindowedStream调用.trigger()方法,就可以传入一个自定义的窗口触发器(Trigger)。
stream.keyBy(...).window(...).trigger(new MyTrigger())
6.1.6.2 移除器(Evictor)
移除器主要用来定义移除某些数据的逻辑。基于WindowedStream调用.evictor()方法,就可以传入一个自定义的移除器(Evictor)。Evictor是一个接口,不同的窗口类型都有各自预实现的移除器。
stream.keyBy(...).window(...).evictor(new MyEvictor())
6.2 时间语义
6.2.1 Flink中的时间语义
6.2.2 哪种时间语义更重要
1)从《星球大战》说起
为了更加清晰地说明两种语义的区别,我们来举一个非常经典的例子:电影《星球大战》。
如上图所示,我们会发现,看电影其实就是处理影片中数据的过程,所以影片的上映时间就相当于“处理时间”;而影片的数据就是所描述的故事,它所发生的背景时间就相当于“事件时间”。两种时间语义都有各自的用途,适用于不同的场景。
2)数据处理系统中的时间语义
在实际应用中,事件时间语义会更为常见。一般情况下,业务日志数据中都会记录数据生成的时间戳(timestamp),它就可以作为事件时间的判断基础。
在Flink中,由于处理时间比较简单,早期版本默认的时间语义是处理时间;而考虑到事件时间在实际应用中更为广泛,从Flink1.12版本开始,Flink已经将事件时间作为默认的时间语义了。
6.3 水位线(Watermark)
6.3.1 事件时间和窗口
6.3.2 什么是水位线
在Flink中,用来衡量事件时间进展的标记,就被称作“水位线”(Watermark)。
具体实现上,水位线可以看作一条特殊的数据记录,它是插入到数据流中的一个标记点,主要内容就是一个时间戳,用来指示当前的事件时间。而它插入流中的位置,就应该是在某个数据到来之后;这样就可以从这个数据中提取时间戳,作为当前水位线的时间戳了。
6.3.3 水位线和窗口的工作原理
注意:Flink中窗口并不是静态准备好的,而是动态创建——当有落在这个窗口区间范围的数据达到时,才创建对应的窗口。另外,这里我们认为到达窗口结束时间时,窗口就触发计算并关闭,事实上“触发计算”和“窗口关闭”两个行为也可以分开,这部分内容我们会在后面详述。
6.3.4 生成水位线
6.3.4.1 生成水位线的总体原则
完美的水位线是“绝对正确”的,也就是一个水位线一旦出现,就表示这个时间之前的数据已经全部到齐、之后再也不会出现了。不过如果要保证绝对正确,就必须等足够长的时间,这会带来更高的延迟。
如果我们希望处理得更快、实时性更强,那么可以将水位线延迟设得低一些。这种情况下,可能很多迟到数据会在水位线之后才到达,就会导致窗口遗漏数据,计算结果不准确。当然,如果我们对准确性完全不考虑、一味地追求处理速度,可以直接使用处理时间语义,这在理论上可以得到最低的延迟。
所以Flink中的水位线,其实是流处理中对低延迟和结果正确性的一个权衡机制,而且把控制的权力交给了程序员,我们可以在代码中定义水位线的生成策略。
6.3.4.2 水位线生成策略
在Flink的DataStream API中,有一个单独用于生成水位线的方法:.assignTimestampsAndWatermarks(),它主要用来为流中的数据分配时间戳,并生成水位线来指示事件时间。具体使用如下:
DataStream<Event> stream = env.addSource(new ClickSource());DataStream<Event> withTimestampsAndWatermarks =
stream.assignTimestampsAndWatermarks(<watermark strategy>);
说明:WatermarkStrategy作为参数,这就是所谓的“水位线生成策略”。WatermarkStrategy是一个接口,该接口中包含了一个“时间戳分配器”TimestampAssigner和一个“水位线生成器”WatermarkGenerator。
public interface WatermarkStrategy<T> extends TimestampAssignerSupplier<T>,WatermarkGeneratorSupplier<T>{// 负责从流中数据元素的某个字段中提取时间戳,并分配给元素。时间戳的分配是生成水位线的基础。@OverrideTimestampAssigner<T> createTimestampAssigner(TimestampAssignerSupplier.Context context);// 主要负责按照既定的方式,基于时间戳生成水位线@OverrideWatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context);
}
6.3.4.3 Flink内置水位线
1)有序流中内置水位线设置
对于有序流,主要特点就是时间戳单调增长,所以永远不会出现迟到数据的问题。这是周期性生成水位线的最简单的场景,直接调用WatermarkStrategy.forMonotonousTimestamps()方法就可以实现。
public class WatermarkMonoDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("hadoop102", 7777).map(new WaterSensorMapFunction());// TODO 1.定义Watermark策略WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy// 1.1 指定watermark生成:升序的watermark,没有等待时间.<WaterSensor>forMonotonousTimestamps()// 1.2 指定 时间戳分配器,从数据中提取.withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {@Overridepublic long extractTimestamp(WaterSensor element, long recordTimestamp) {// 返回的时间戳,要 毫秒System.out.println("数据=" + element + ",recordTs=" + recordTimestamp);return element.getTs() * 1000L;}});// TODO 2. 指定 watermark策略SingleOutputStreamOperator<WaterSensor> sensorDSwithWatermark = sensorDS.assignTimestampsAndWatermarks(watermarkStrategy);sensorDSwithWatermark.keyBy(sensor -> sensor.getId())// TODO 3.使用 事件时间语义 的窗口.window(TumblingEventTimeWindows.of(Time.seconds(10))).process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {@Overridepublic void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {long startTs = context.window().getStart();long endTs = context.window().getEnd();String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");long count = elements.spliterator().estimateSize();out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements.toString());}}).print();env.execute();}
}
2)乱序流中内置水位线设置
由于乱序流中需要等待迟到数据到齐,所以必须设置一个固定量的延迟时间。这时生成水位线的时间戳,就是当前数据流中最大的时间戳减去延迟的结果,相当于把表调慢,当前时钟会滞后于数据的最大时间戳。调用WatermarkStrategy. forBoundedOutOfOrderness()方法就可以实现。这个方法需要传入一个maxOutOfOrderness参数,表示“最大乱序程度”,它表示数据流中乱序数据时间戳的最大差值;如果我们能确定乱序程度,那么设置对应时间长度的延迟,就可以等到所有的乱序数据了。
public class WatermarkOutOfOrdernessDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("hadoop102", 7777).map(new WaterSensorMapFunction());// TODO 1.定义Watermark策略WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy// 1.1 指定watermark生成:乱序的,等待3s.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))// 1.2 指定 时间戳分配器,从数据中提取.withTimestampAssigner((element, recordTimestamp) -> {// 返回的时间戳,要 毫秒System.out.println("数据=" + element + ",recordTs=" + recordTimestamp);return element.getTs() * 1000L;});// TODO 2. 指定 watermark策略SingleOutputStreamOperator<WaterSensor> sensorDSwithWatermark = sensorDS.assignTimestampsAndWatermarks(watermarkStrategy);sensorDSwithWatermark.keyBy(sensor -> sensor.getId())// TODO 3.使用 事件时间语义 的窗口.window(TumblingEventTimeWindows.of(Time.seconds(10))).process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {@Overridepublic void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {long startTs = context.window().getStart();long endTs = context.window().getEnd();String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");long count = elements.spliterator().estimateSize();out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements.toString());}}).print();env.execute();}
}
6.3.4.4 自定义水位线生成器
1)周期性水位线生成器(Periodic Generator)
周期性生成器一般是通过onEvent()观察判断输入的事件,而在onPeriodicEmit()里发出水位线。
下面是一段自定义周期性生成水位线的代码:
import com.atguigu.bean.Event;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;// 自定义水位线的产生
public class CustomPeriodicWatermarkExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.addSource(new ClickSource()).assignTimestampsAndWatermarks(new CustomWatermarkStrategy()).print();env.execute();}public static class CustomWatermarkStrategy implements WatermarkStrategy<Event> {@Overridepublic TimestampAssigner<Event> createTimestampAssigner(TimestampAssignerSupplier.Context context) {return new SerializableTimestampAssigner<Event>() {@Overridepublic long extractTimestamp(Event element,long recordTimestamp) {return element.timestamp; // 告诉程序数据源里的时间戳是哪一个字段}};}@Overridepublic WatermarkGenerator<Event> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {return new CustomBoundedOutOfOrdernessGenerator();}}public static class CustomBoundedOutOfOrdernessGenerator implements WatermarkGenerator<Event> {private Long delayTime = 5000L; // 延迟时间private Long maxTs = -Long.MAX_VALUE + delayTime + 1L; // 观察到的最大时间戳@Overridepublic void onEvent(Event event,long eventTimestamp,WatermarkOutput output) {// 每来一条数据就调用一次maxTs = Math.max(event.timestamp,maxTs); // 更新最大时间戳}@Overridepublic void onPeriodicEmit(WatermarkOutput output) {// 发射水位线,默认200ms调用一次output.emitWatermark(new Watermark(maxTs - delayTime - 1L));}}
}
我们在onPeriodicEmit()里调用output.emitWatermark(),就可以发出水位线了;这个方法由系统框架周期性地调用,默认200ms一次。
如果想修改默认周期时间,可以通过下面方法修改。例如:修改为400ms
env.getConfig().setAutoWatermarkInterval(400L);
2)断点式水位线生成器(Punctuated Generator)
断点式生成器会不停地检测onEvent()中的事件,当发现带有水位线信息的事件时,就立即发出水位线。我们把发射水位线的逻辑写在onEvent方法当中即可。
3)在数据源中发送水位线
我们也可以在自定义的数据源中抽取事件时间,然后发送水位线。这里要注意的是,在自定义数据源中发送了水位线以后,就不能再在程序中使用assignTimestampsAndWatermarks方法来生成水位线了。在自定义数据源中生成水位线和在程序中使用assignTimestampsAndWatermarks方法生成水位线二者只能取其一。示例程序如下:
env.fromSource(
kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)), "kafkasource"
)
6.3.5 水位线的传递
在流处理中,上游任务处理完水位线、时钟改变之后,要把当前的水位线再次发出,广播给所有的下游子任务。而当一个任务接收到多个上游并行任务传递来的水位线时,应该以最小的那个作为当前任务的事件时钟。
水位线在上下游任务之间的传递,非常巧妙地避免了分布式系统中没有统一时钟的问题,每个任务都以“处理完之前所有数据”为标准来确定自己的时钟。
案例:6.3.4.3 中乱序流的watermark,将并行度设为2,观察现象。
在多个上游并行任务中,如果有其中一个没有数据,由于当前Task是以最小的那个作为当前任务的事件时钟,就会导致当前Task的水位线无法推进,就可能导致窗口无法触发。这时候可以设置空闲等待。
用5.3.4.6中的自定义分区器,只输入奇数来模拟部分subtask无数据,代码如下:
public class WatermarkIdlenessDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);// 自定义分区器:数据%分区数,只输入奇数,都只会去往map的一个子任务SingleOutputStreamOperator<Integer> socketDS = env.socketTextStream("hadoop102", 7777).partitionCustom(new MyPartitioner(), r -> r).map(r -> Integer.parseInt(r)).assignTimestampsAndWatermarks(WatermarkStrategy.<Integer>forMonotonousTimestamps().withTimestampAssigner((r, ts) -> r * 1000L).withIdleness(Duration.ofSeconds(5)) //空闲等待5s);// 分成两组: 奇数一组,偶数一组 , 开10s的事件时间滚动窗口socketDS.keyBy(r -> r % 2).window(TumblingEventTimeWindows.of(Time.seconds(10))).process(new ProcessWindowFunction<Integer, String, Integer, TimeWindow>() {@Overridepublic void process(Integer integer, Context context, Iterable<Integer> elements, Collector<String> out) throws Exception {long startTs = context.window().getStart();long endTs = context.window().getEnd();String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");long count = elements.spliterator().estimateSize();out.collect("key=" + integer + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements.toString());}}).print();env.execute();}
}
6.3.6 迟到数据的处理
6.3.6.1 推迟水印推进
在水印产生时,设置一个乱序容忍度,推迟系统时间的推进,保证窗口计算被延迟执行,为乱序的数据争取更多的时间进入窗口。
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10));
6.3.6.2 设置窗口延迟关闭
Flink的窗口,也允许迟到数据。当触发了窗口计算后,会先计算当前的结果,但是此时并不会关闭窗口。
以后每来一条迟到数据,就触发一次这条数据所在窗口计算(增量计算)。直到wartermark 超过了窗口结束时间+推迟时间,此时窗口会真正关闭。
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.allowedLateness(Time.seconds(3))
注意:
允许迟到只能运用在event time上
6.3.6.3 使用侧流接收迟到的数据
.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
.allowedLateness(Time.seconds(3))
.sideOutputLateData(lateWS)
完整案例代码如下:
public class WatermarkLateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("hadoop102", 7777).map(new WaterSensorMapFunction());WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((element, recordTimestamp) -> element.getTs() * 1000L);SingleOutputStreamOperator<WaterSensor> sensorDSwithWatermark = sensorDS.assignTimestampsAndWatermarks(watermarkStrategy);OutputTag<WaterSensor> lateTag = new OutputTag<>("late-data", Types.POJO(WaterSensor.class));SingleOutputStreamOperator<String> process = sensorDSwithWatermark.keyBy(sensor -> sensor.getId()).window(TumblingEventTimeWindows.of(Time.seconds(10))).allowedLateness(Time.seconds(2)) // 推迟2s关窗.sideOutputLateData(lateTag) // 关窗后的迟到数据,放入侧输出流.process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {@Overridepublic void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {long startTs = context.window().getStart();long endTs = context.window().getEnd();String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");long count = elements.spliterator().estimateSize();out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements.toString());}});process.print();// 从主流获取侧输出流,打印process.getSideOutput(lateTag).printToErr("关窗后的迟到数据");env.execute();}
}
6.4 基于时间的合流——双流联结(Join)
可以发现,根据某个key合并两条流,与关系型数据库中表的join操作非常相近。事实上,Flink中两条流的connect操作,就可以通过keyBy指定键进行分组后合并,实现了类似于SQL中的join操作;另外connect支持处理函数,可以使用自定义实现各种需求,其实已经能够处理双流join的大多数场景。
不过处理函数是底层接口,所以尽管connect能做的事情多,但在一些具体应用场景下还是显得太过抽象了。比如,如果我们希望统计固定时间内两条流数据的匹配情况,那就需要自定义来实现——其实这完全可以用窗口(window)来表示。为了更方便地实现基于时间的合流操作,Flink的DataStrema API提供了内置的join算子。
6.4.1 窗口联结(Window Join)
Flink为基于一段时间的双流合并专门提供了一个窗口联结算子,可以定义时间窗口,并将两条流中共享一个公共键(key)的数据放在窗口中进行配对处理。
1)窗口联结的调用
窗口联结在代码中的实现,首先需要调用DataStream的.join()方法来合并两条流,得到一个JoinedStreams;接着通过.where()和.equalTo()方法指定两条流中联结的key;然后通过.window()开窗口,并调用.apply()传入联结窗口函数进行处理计算。通用调用形式如下:
stream1.join(stream2).where(<KeySelector>).equalTo(<KeySelector>).window(<WindowAssigner>).apply(<JoinFunction>)
上面代码中.where()的参数是键选择器(KeySelector),用来指定第一条流中的key;而.equalTo()传入的KeySelector则指定了第二条流中的key。两者相同的元素,如果在同一窗口中,就可以匹配起来,并通过一个“联结函数”(JoinFunction)进行处理了。
这里.window()传入的就是窗口分配器,之前讲到的三种时间窗口都可以用在这里:滚动窗口(tumbling window)、滑动窗口(sliding window)和会话窗口(session window)。
而后面调用.apply()可以看作实现了一个特殊的窗口函数。注意这里只能调用.apply(),没有其他替代的方法。
传入的JoinFunction也是一个函数类接口,使用时需要实现内部的.join()方法。这个方法有两个参数,分别表示两条流中成对匹配的数据。
其实仔细观察可以发现,窗口join的调用语法和我们熟悉的SQL中表的join非常相似:
SELECT * FROM table1 t1, table2 t2 WHERE t1.id = t2.id;
这句SQL中where子句的表达,等价于inner join … on,所以本身表示的是两张表基于id的“内连接”(inner join)。而Flink中的window join,同样类似于inner join。也就是说,最后处理输出的,只有两条流中数据按key配对成功的那些;如果某个窗口中一条流的数据没有任何另一条流的数据匹配,那么就不会调用JoinFunction的.join()方法,也就没有任何输出了。
2)窗口联结实例
代码实现:
public class WindowJoinDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<Tuple2<String, Integer>> ds1 = env.fromElements(Tuple2.of("a", 1),Tuple2.of("a", 2),Tuple2.of("b", 3),Tuple2.of("c", 4)).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Integer>>forMonotonousTimestamps().withTimestampAssigner((value, ts) -> value.f1 * 1000L));SingleOutputStreamOperator<Tuple3<String, Integer,Integer>> ds2 = env.fromElements(Tuple3.of("a", 1,1),Tuple3.of("a", 11,1),Tuple3.of("b", 2,1),Tuple3.of("b", 12,1),Tuple3.of("c", 14,1),Tuple3.of("d", 15,1)).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, Integer,Integer>>forMonotonousTimestamps().withTimestampAssigner((value, ts) -> value.f1 * 1000L));// TODO window join// 1. 落在同一个时间窗口范围内才能匹配// 2. 根据keyby的key,来进行匹配关联// 3. 只能拿到匹配上的数据,类似有固定时间范围的inner joinDataStream<String> join = ds1.join(ds2).where(r1 -> r1.f0) // ds1的keyby.equalTo(r2 -> r2.f0) // ds2的keyby.window(TumblingEventTimeWindows.of(Time.seconds(10))).apply(new JoinFunction<Tuple2<String, Integer>, Tuple3<String, Integer, Integer>, String>() {/*** 关联上的数据,调用join方法* @param first ds1的数据* @param second ds2的数据* @return* @throws Exception*/@Overridepublic String join(Tuple2<String, Integer> first, Tuple3<String, Integer, Integer> second) throws Exception {return first + "<----->" + second;}});join.print();env.execute();}
}
6.4.2 间隔联结(Interval Join)
在有些场景下,我们要处理的时间间隔可能并不是固定的。这时显然不应该用滚动窗口或滑动窗口来处理——因为匹配的两个数据有可能刚好“卡在”窗口边缘两侧,于是窗口内就都没有匹配了;会话窗口虽然时间不固定,但也明显不适合这个场景。基于时间的窗口联结已经无能为力了。
为了应对这样的需求,Flink提供了一种叫作“间隔联结”(interval join)的合流操作。顾名思义,间隔联结的思路就是针对一条流的每个数据,开辟出其时间戳前后的一段时间间隔,看这期间是否有来自另一条流的数据匹配。
1)间隔联结的原理
间隔联结具体的定义方式是,我们给定两个时间点,分别叫作间隔的“上界”(upperBound)和“下界”(lowerBound);于是对于一条流(不妨叫作A)中的任意一个数据元素a,就可以开辟一段时间间隔:[a.timestamp + lowerBound, a.timestamp + upperBound],即以a的时间戳为中心,下至下界点、上至上界点的一个闭区间:我们就把这段时间作为可以匹配另一条流数据的“窗口”范围。所以对于另一条流(不妨叫B)中的数据元素b,如果它的时间戳落在了这个区间范围内,a和b就可以成功配对,进而进行计算输出结果。所以匹配的条件为:
a.timestamp + lowerBound <= b.timestamp <= a.timestamp + upperBound
这里需要注意,做间隔联结的两条流A和B,也必须基于相同的key;下界lowerBound应该小于等于上界upperBound,两者都可正可负;间隔联结目前只支持事件时间语义。
如下图所示,我们可以清楚地看到间隔联结的方式:
下方的流A去间隔联结上方的流B,所以基于A的每个数据元素,都可以开辟一个间隔区间。我们这里设置下界为-2毫秒,上界为1毫秒。于是对于时间戳为2的A中元素,它的可匹配区间就是[0, 3],流B中有时间戳为0、1的两个元素落在这个范围内,所以就可以得到匹配数据对(2, 0)和(2, 1)。同样地,A中时间戳为3的元素,可匹配区间为[1, 4],B中只有时间戳为1的一个数据可以匹配,于是得到匹配数据对(3, 1)。
所以我们可以看到,间隔联结同样是一种内连接(inner join)。与窗口联结不同的是,interval join做匹配的时间段是基于流中数据的,所以并不确定;而且流B中的数据可以不只在一个区间内被匹配。
2)间隔联结的调用
间隔联结在代码中,是基于KeyedStream的联结(join)操作。DataStream在keyBy得到KeyedStream之后,可以调用.intervalJoin()来合并两条流,传入的参数同样是一个KeyedStream,两者的key类型应该一致;得到的是一个IntervalJoin类型。后续的操作同样是完全固定的:先通过.between()方法指定间隔的上下界,再调用.process()方法,定义对匹配数据对的处理操作。调用.process()需要传入一个处理函数,这是处理函数家族的最后一员:“处理联结函数”ProcessJoinFunction。
通用调用形式如下:
stream1.keyBy(<KeySelector>).intervalJoin(stream2.keyBy(<KeySelector>)).between(Time.milliseconds(-2), Time.milliseconds(1)).process (new ProcessJoinFunction<Integer, Integer, String(){@Overridepublic void processElement(Integer left, Integer right, Context ctx, Collector<String> out) {out.collect(left + "," + right);}});
可以看到,抽象类ProcessJoinFunction就像是ProcessFunction和JoinFunction的结合,内部同样有一个抽象方法.processElement()。与其他处理函数不同的是,它多了一个参数,这自然是因为有来自两条流的数据。参数中left指的就是第一条流中的数据,right则是第二条流中与它匹配的数据。每当检测到一组匹配,就会调用这里的.processElement()方法,经处理转换之后输出结果。
3)间隔联结实例
案例需求:在电商网站中,某些用户行为往往会有短时间内的强关联。我们这里举一个例子,我们有两条流,一条是下订单的流,一条是浏览数据的流。我们可以针对同一个用户,来做这样一个联结。也就是使用一个用户的下订单的事件和这个用户的最近十分钟的浏览数据进行一个联结查询。
(1)代码实现:正常使用
public class IntervalJoinDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<Tuple2<String, Integer>> ds1 = env.fromElements(Tuple2.of("a", 1),Tuple2.of("a", 2),Tuple2.of("b", 3),Tuple2.of("c", 4)).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Integer>>forMonotonousTimestamps().withTimestampAssigner((value, ts) -> value.f1 * 1000L));SingleOutputStreamOperator<Tuple3<String, Integer, Integer>> ds2 = env.fromElements(Tuple3.of("a", 1, 1),Tuple3.of("a", 11, 1),Tuple3.of("b", 2, 1),Tuple3.of("b", 12, 1),Tuple3.of("c", 14, 1),Tuple3.of("d", 15, 1)).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, Integer, Integer>>forMonotonousTimestamps().withTimestampAssigner((value, ts) -> value.f1 * 1000L));// TODO interval join//1. 分别做keyby,key其实就是关联条件KeyedStream<Tuple2<String, Integer>, String> ks1 = ds1.keyBy(r1 -> r1.f0);KeyedStream<Tuple3<String, Integer, Integer>, String> ks2 = ds2.keyBy(r2 -> r2.f0);//2. 调用 interval joinks1.intervalJoin(ks2).between(Time.seconds(-2), Time.seconds(2)).process(new ProcessJoinFunction<Tuple2<String, Integer>, Tuple3<String, Integer, Integer>, String>() {/*** 两条流的数据匹配上,才会调用这个方法* @param left ks1的数据* @param right ks2的数据* @param ctx 上下文* @param out 采集器* @throws Exception*/@Overridepublic void processElement(Tuple2<String, Integer> left, Tuple3<String, Integer, Integer> right, Context ctx, Collector<String> out) throws Exception {// 进入这个方法,是关联上的数据out.collect(left + "<------>" + right);}}).print();env.execute();}
}
(2)代码实现:处理迟到数据
public class IntervalJoinWithLateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<Tuple2<String, Integer>> ds1 = env.socketTextStream("hadoop102", 7777).map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String value) throws Exception {String[] datas = value.split(",");return Tuple2.of(datas[0], Integer.valueOf(datas[1]));}}).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Integer>>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((value, ts) -> value.f1 * 1000L));SingleOutputStreamOperator<Tuple3<String, Integer, Integer>> ds2 = env.socketTextStream("hadoop102", 8888).map(new MapFunction<String, Tuple3<String, Integer, Integer>>() {@Overridepublic Tuple3<String, Integer, Integer> map(String value) throws Exception {String[] datas = value.split(",");return Tuple3.of(datas[0], Integer.valueOf(datas[1]), Integer.valueOf(datas[2]));}}).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, Integer, Integer>>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((value, ts) -> value.f1 * 1000L));/*** TODO Interval join* 1、只支持事件时间* 2、指定上界、下界的偏移,负号代表时间往前,正号代表时间往后* 3、process中,只能处理 join上的数据* 4、两条流关联后的watermark,以两条流中最小的为准* 5、如果 当前数据的事件时间 < 当前的watermark,就是迟到数据, 主流的process不处理* => between后,可以指定将 左流 或 右流 的迟到数据 放入侧输出流*///1. 分别做keyby,key其实就是关联条件KeyedStream<Tuple2<String, Integer>, String> ks1 = ds1.keyBy(r1 -> r1.f0);KeyedStream<Tuple3<String, Integer, Integer>, String> ks2 = ds2.keyBy(r2 -> r2.f0);//2. 调用 interval joinOutputTag<Tuple2<String, Integer>> ks1LateTag = new OutputTag<>("ks1-late", Types.TUPLE(Types.STRING, Types.INT));OutputTag<Tuple3<String, Integer, Integer>> ks2LateTag = new OutputTag<>("ks2-late", Types.TUPLE(Types.STRING, Types.INT, Types.INT));SingleOutputStreamOperator<String> process = ks1.intervalJoin(ks2).between(Time.seconds(-2), Time.seconds(2)).sideOutputLeftLateData(ks1LateTag) // 将 ks1的迟到数据,放入侧输出流.sideOutputRightLateData(ks2LateTag) // 将 ks2的迟到数据,放入侧输出流.process(new ProcessJoinFunction<Tuple2<String, Integer>, Tuple3<String, Integer, Integer>, String>() {/*** 两条流的数据匹配上,才会调用这个方法* @param left ks1的数据* @param right ks2的数据* @param ctx 上下文* @param out 采集器* @throws Exception*/@Overridepublic void processElement(Tuple2<String, Integer> left, Tuple3<String, Integer, Integer> right, Context ctx, Collector<String> out) throws Exception {// 进入这个方法,是关联上的数据out.collect(left + "<------>" + right);}});process.print("主流");process.getSideOutput(ks1LateTag).printToErr("ks1迟到数据");process.getSideOutput(ks2LateTag).printToErr("ks2迟到数据");env.execute();}
}
相关文章:
【Flink系列】6. Flink中的时间和窗口
6. Flink中的时间和窗口 在批处理统计中,我们可以等待一批数据都到齐后,统一处理。但是在实时处理统计中,我们是来一条就得处理一条,那么我们怎么统计最近一段时间内的数据呢?引入“窗口”。 所谓的“窗口”ÿ…...
代码随想录算法训练营第三十五天-动态规划-01背包(二维)
动规五部曲 dp数组的含义,注意这是一个二维数组。dp[i][j] 第一维度代表“从0到第i个物品,而且包括选或不选的情况,即这一维度代表物品编号第二维度代表代表背包容量合在一起的意思是当背包容量是j时,从0到i个物品中选择任意物品…...
快速开发:用AI构造AI —— 打造属于个人的Copilot(M-聪明AI)
作品简介: 当今快速发展的AI时代,学会使用AI的同时,也可以融入AI,来打造自己的产品,我给我这个取名M-聪明, 是基于VUE 3 Spring Boot -Redis ChatGML RxJava SSE 的AI 服务平台。然后这款工具旨在为用户…...
Elasticsearch容器启动报错:AccessDeniedException[/usr/share/elasticsearch/data/nodes];
AccessDeniedException 表明 Elasticsearch 容器无法访问或写入数据目录 /usr/share/elasticsearch/data/nodes。这是一个权限问题。 问题原因: 1、宿主机目录权限不足:映射到容器的数据目录 /data/es/data 在宿主机上可能没有足够的权限供容器访问。 …...
用公网服务器实现内网穿透
首先需要一个公网服务器 下载frp 搜索github下载到frp,服务端frps/客户端frpc。。下载的时候要注意自己本地内网机的cpu版本和服务端cpu架构 我的电脑是mac M1PRO版本 下载的是:darwinarm64 比如 服务端一般是Linux(Intel 64位CPU…...
Jmeter如何进行多服务器远程测试
🍅 点击文末小卡片 ,免费获取软件测试全套资料,资料在手,涨薪更快 JMeter是Apache软件基金会的开源项目,主要来做功能和性能测试,用Java编写。 我们一般都会用JMeter在本地进行测试,但是受到单…...
前端实习第二个月小结
时间飞快,第一次实习已经过去两个多月,作一些简单的总结和分享。 注:文章整体会比较轻松,提及的经历、经验仅作参考。 一、关于实习/工作内容 1、工作内容 近期做的是管理后台方面的业务,技术栈:前端re…...
C# 并发和并行的区别--16
目录 并发和并行 一.并发 定义 特点 代码示例 代码解释 二.并行 定义 特点 在C#中的体现 代码示例 代码解释 三.并发和并行的区别 四 .如何在C#中选择并发还是并行 1.考虑任务类型 2.代码示例 3.注意事项 五.总结 并发和并行 在编程领域,并发和并行是两个密切…...
Python编程与在线医疗平台数据挖掘与数据应用交互性研究
一、引言 1.1 研究背景与意义 在互联网技术飞速发展的当下,在线医疗平台如雨后春笋般涌现,为人们的就医方式带来了重大变革。这些平台打破了传统医疗服务在时间和空间上的限制,使患者能够更加便捷地获取医疗资源。据相关报告显示,中国基于互联网的医疗保健行业已进入新的…...
HBase实训:纸币冠字号查询任务
一、实验目的 1. 理解分布式数据存储系统HBase的架构和工作原理。 2. 掌握HBase表的设计原则,能够根据实际业务需求设计合理的表结构。 3. 学习使用HBase Java API进行数据的插入、查询和管理。 4. 实践分布式数据存储系统在大数据环境下的应用,…...
Java 读取 Windows 设备的唯一性标识及定位
在 Windows 系统中,获取设备唯一性标识及定位信息对设备管理、安全监控等场景意义重大。本文介绍 Java 中几种实现方法,如 JNA 库、WMI4Java 库及通过 JNI 结合 Windows API。 1. 使用 JNA 库读取 DEVPKEY_Device_ContainerId 在 Windows 系统中&…...
UE控件学习
ListView: item设置:使能在list设置为Entry类 Grid Panel: 常用作背包,每个格子大小可不相同 WidgetSwitcher: 用于切换页签 Wrap_Box: 自动横向排版子节点,超过一定范围则自动换行…...
1.Spring AI 从入门到实践
Spring AI 从入门到实践 1.什么是Spring AI 2.使用Spring Boot&Spring AI快速构建AI应用程序 3.ChatClient&Chat Model简化与AI模型的交互 4.Spring AI Prompt:与大模型进行有效沟通 5.结构化输出大模型响应 6.实战:AI聊天机器人 Ben技术站关注Java技术&#x…...
2025年01月蓝桥杯Scratch1月stema选拔赛真题—美丽的图形
美丽的图形 编程实现美丽的图形具体要求: 1)点击绿旗,角色在舞台中心,如图所示; 2)1秒后,绘制一个边长为 140的红色大正方形,线条粗细为 3,正方形的中心为舞台中心,如图所示; 完整题目可点击下…...
FLASK创建下载
html用a标签 <!-- Button to download the image --> <a href"{{ url_for(download_file, filenameimage.png) }}"><button>Download Image</button> </a> 后端:url_for双大括号即是用来插入变量到模板中的语法。也就是绑…...
LDD3学习7--硬件接口I/O端口(以short为例)
1 理论 1.1 基本概念 目前对外设的操作,都是通过寄存器。寄存器的概念,其实就是接口,访问硬件接口,有I/O端口通信和内存映射I/O (Memory-Mapped I/O),I/O端口通信是比较老的那种,都是老的串口并口设备&am…...
MySQL(高级特性篇) 06 章——索引的数据结构
一、为什么使用索引 索引是存储引擎用于快速找到数据记录的一种数据结构,就好比一本教科书的目录部分,通过目录找到对应文章的页码,便可快速定位到需要的文章。MySQL中也是一样的道理,进行数据查找时,首先查看查询条件…...
【FlutterDart】MVVM(Model-View-ViewModel)架构模式例子-http版本(30 /100)
动图更精彩 MVVM(Model-View-ViewModel) 特点 Model:负责数据管理和业务逻辑。 View:负责显示数据,通常是一个UI组件。 ViewModel:负责处理用户交互,更新Model,并将数据转换为View可…...
光谱相机的光谱分辨率可以达到多少?
多光谱相机 多光谱相机的光谱分辨率相对较低,波段数一般在 10 到 20 个左右,光谱分辨率通常在几十纳米到几百纳米之间,如常见的多光谱相机光谱分辨率为 100nm 左右。 高光谱相机 一般的高光谱相机光谱分辨率可达 2.5nm 到 10nm 左右&#x…...
.Net8 Avalonia跨平台UI框架——<vlc:VideoView>控件播放海康监控、摄像机视频(Windows / Linux)
一、UI效果 二、新建用户控件:VideoViewControl.axaml 需引用:VideoLAN.LibVLC.Windows包 Linux平台需安装:VLC 和 LibVLC (sudo apt-get update、sudo apt-get install vlc libvlccore-dev libvlc-dev) .axaml 代码 注…...
【论文阅读】基于空间相关性与Stacking集成学习的风电功率预测方法
文章目录 摘要0. 引言1. 空间相关性分析2. 风电功率预测模型2.1 Stacking 集成策略2.2 基学习器2.2.1 基于机器学习算法的基学习器2.2.2 基于神经网络的基学习器2.2.3 基于粒子群优化算法的超参数优化 2.3 元学习器2.4 基于空间相关性与Stacking集成学习的风电功率预测方法 3 算…...
什么是Spring Boot 应用开发?
一、引言 在当今的软件开发领域,Java 依然占据着重要的地位,而 Spring Boot 作为 Java 生态系统中极具影响力的框架,极大地简化了企业级应用的开发流程,提升了开发效率和应用的可维护性。它基于 Spring 框架构建,通过…...
选择saas 还是源码主要考虑
公司业务规模:小型企业可能会发现SaaS提供的即用型解决方案更符合其需求,而大型企业可能需要源码以实现更高的定制性和控制权。 公司技术专长:缺乏技术团队的企业可能会倾向于使用SaaS,而那些拥有强大IT部门的企业可能更适合管理…...
【JAVA 基础 第(19)课】Hashtable 类用法和注意细节,是Map接口的实现类
Map接口:存放的是具有映射关系的键值对,键映射到值,键必须是唯一的 Hashtable 类,Map接口的实现类,键和值都不能为nullHashtable 是同步的,是线程安全的 public class MapTest {public static void main(String[] arg…...
AI时代下 | 通义灵码冲刺备战求职季
AI时代下 | 通义灵码冲刺备战求职季 什么是通义灵码使用智能编程助手备战求职靠谱吗体验心得 AI时代下,备战求职季有了不一样的方法,使用通义灵码冲刺备战求职季,会有什么样的体验? 什么是通义灵码 在开始话题之前,首…...
如何将 session 共享存储到 redis 中
文章目录 一. 分布式 session 登录1.1 什么是分布式?1.2 Session 共享1.3 为什么服务器 A 登录后,请求发到服务器 B,不认识该用户?1.4 共享存储 二. Session 共享实现Redis三. 测试session共享四. cookie设置4.1 前端4.2 后端 一.…...
智能科技与共情能力加持,哈曼重新定义驾乘体验
2025年1月6日,拉斯维加斯,2025年国际消费电子展——想象一下,当您步入一辆汽车,它不仅能响应您的指令,更能理解您的需求、适应您的偏好,并为您创造一个独特且专属的交互环境。作为汽车科技领域的知名企业和…...
第4章 Kafka核心API——Kafka客户端操作
Kafka客户端操作 一. 客户端操作1. AdminClient API 一. 客户端操作 1. AdminClient API...
Debian 设定 tomcat 定时重启
目录 背景 过程记录 1、编辑sh文件,完成重启功能 2、设置sh的可执行权限 编辑 3、设置定时任务 背景 在Debian 12系统中,原本部署了两个tomcat,结果总是遇到CPU飙升到影响应用正常使用的程度,找了很久原因还是没有找到。 …...
mysql8.0 重要指标参数介绍
MySQL 8.0 引入了许多新的功能和优化,针对性能、可扩展性、可靠性以及安全性方面做出了显著改进。为了确保 MySQL 的高效运行,了解和配置 MySQL 的一些关键指标参数非常重要。以下是 MySQL 8.0 中的一些重要参数和指标,帮助你优化数据库性能。…...
SpringMVC (2)
目录 1. RequestMapping 注解介绍 2. RequestMapping 使用 3. RequestMapping与请求方式 3.1 RequestMapping 支持Get和Post类型的请求 3.2 RequestMapping 指定接收某种请求 3.3 GetMapping和PostMapping 4. 传参 4.1 通过查询字符串传参 4.2 在 Body 中传参 4.2.1 …...
【全面解析】深入解析 TCP/IP 协议:网络通信的基石
深入解析 TCP/IP 协议:网络通信的基石 导语 你是否曾好奇,现代互联网是如何实现全球设备之间的高速、稳定和可靠通信的?无论是浏览网页、发送电子邮件,还是进行视频通话,背后都离不开 TCP/IP 协议 的支撑。作为互联网…...
图数据库 | 19、高可用分布式设计(下)
相信大家对分布式系统设计与实现的复杂性已经有了一定的了解,本篇文章对分布式图数据库系统中最复杂的一类系统架构设计进行探索,即水平分布式图数据库系统(这个挑战也可以泛化为水平分布式图数据仓库、图湖泊、图中台或任何其他依赖图存储、…...
【2024年华为OD机试】 (C卷,200分)- 反射计数(Java JS PythonC/C++)
一、问题描述 题目解析 题目描述 给定一个包含 0 和 1 的二维矩阵,一个物体从给定的初始位置出发,在给定的速度下进行移动。遇到矩阵的边缘时会发生镜面反射。无论物体经过 0 还是 1,都不影响其速度。请计算并给出经过 t 时间单位后&#…...
【微服务】SpringCloud 1-9章
1从Boot和Cloud版本选型开始说起 1.1Springboot版本选择 1.1.1git源码地址 https://github.com/spring-projects/spring-boot/releases/ 1.1.2官网看Boot版本 1.1.3SpringBoot3.0崛起 https://github.com/spring-projects/spring-boot/wiki/Spring-Boot-3.0-Release-Notes …...
Jmeter进行http接口并发测试
目录: 1、Jmeter设置(1)设置请求并发数(2)设置请求地址以及参数(3)添加结果数 2、启动看结果 1、Jmeter设置 (1)设置请求并发数 (2)设置请求地址…...
JavaScript语言的数据结构
JavaScript中的数据结构 引言 在编程的世界里,数据结构是处理和组织数据的重要方式。数据结构的选择往往直接影响到程序的性能和可维护性。JavaScript作为一门广泛使用的编程语言,在数据结构的设计和使用上也有其独特的特点。本文将深入探讨JavaScript…...
【数据分享】1929-2024年全球站点的逐日平均气温数据(Shp\Excel\免费获取)
气象数据是在各项研究中都经常使用的数据,气象指标包括气温、风速、降水、湿度等指标,其中又以气温指标最为常用!说到气温数据,最详细的气温数据是具体到气象监测站点的气温数据!本次我们为大家带来的就是具体到气象监…...
DETRs with Collaborative Hybrid Assignments Training论文阅读与代码
关键词:协作混合分配训练 【目标检测】Co-DETR:ATSS+Faster RCNN+DETR协作的先进检测器(ICCV 2023)-CSDN博客 摘要: 在这篇论文中,作者观察到在DETR中将过少的 Query 分配为正样本,采用一对一的集合匹配,会导致对编码器输出的监督稀疏,严重损害编码器的区分特征学习…...
某国际大型超市电商销售数据分析和可视化
完整源码项目包获取→点击文章末尾名片! 本作品将从人、货、场三个维度,即客户维度、产品维度、区域维度(补充时间维度与其他维度)对某国际大型超市的销售情况进行数据分析和可视化报告展示,从而为该超市在弄清用户消费…...
码云gitee 新建仓库 添加公钥
码云gitee 新建仓库 添加公钥 文章目录 码云gitee 新建仓库 添加公钥新建仓库生成公钥管理个人公钥安全验证 码云这个网站是一个代码托管平台,在国内可以无限制的使用,在这个网站上,也可以搜索到一些github上面的内容。进入这个网站ÿ…...
SQL 基础教程 - SQL SELECT INTO 语句
通过 SQL,您可以从一个表复制信息到另一个表。 SELECT INTO 语句从一个表复制数据,然后把数据插入到另一个新表中。 SQL SELECT INTO 语句 SELECT INTO 语句从一个表复制数据,然后把数据插入到另一个新表中。 注意: MySQL 数据…...
《leetcode-runner》如何手搓一个debug调试器——指令系统
前文: 《leetcode-runner》如何手搓一个debug调试器——引言 《leetcode-runner》如何手搓一个debug调试器——架构 文章目录 什么是指令系统指令的组成部分leetcode-runner支持哪些指令如何解析用户输入的命令行指令识别流程 仓库地址:leetcode-runner …...
基于预共享密钥的IPsec实验
一、实验目的 (1)了解IPsec的原理和协议运行机制; (2)掌握IPsec身份认证的预共享密钥的配置; (3)掌握用Wireshark工具抓包分析IPsec数据包格式和协议流程。 二、实验设备与环境 &…...
Golang Gin系列-2:搭建Gin 框架环境
开始网络开发之旅通常是从选择合适的工具开始的。在这个全面的指南中,我们将引导你完成安装Go编程语言和Gin框架的过程,Gin框架是Go的轻量级和灵活的web框架。从设置Go工作空间到将Gin整合到项目中,本指南是高效而强大的web开发路线图。 安装…...
R语言绘图
多组火山图 数据准备: 将CSV文件同一在一个路径下,用代码合并 确保文件列名正确 library(fs) library(dplyr) library(tidyr) library(stringr) library(ggplot2) library(ggfun) library(ggrepel)# 获取文件列表 file_paths <- dir_ls(path &quo…...
Linux《Linux简介与环境的搭建》
在学习了C或者是C语言的基础知识之后就可以开始Linux的学习了,现在Linux无论是在服务器领域还是在桌面领域都被广泛的使用,所以Linxu也是我们学习编程的重要环节,在此接下来我们将会花大量的时间在Linxu的学习上。在学习Linux初期你可以会像初…...
.Net Core webapi 实现JWT认证
文章目录 需求准备创建JWT配置创建JWTService注册JWT创建中间件读取jwt的token在需要的接口上添加属性启动认证启动swagger的授权认证使用 需求 实现一个记录某个用户所有操作的功能 准备 创建你的webapi项目从nuget下载安装JWT资源包根据你的项目使用.net版本下载对应的jwt…...
SDL2:Android APP编译使用 -- SDL2多媒体库使用音频实例
SDL2:Android APP编译使用 3. SDL2:Android APP编译使用3.1 Android Studio环境准备:3.2 构建Android APP(1)方式一:快速构建APK工程(2)方式二:自定义APK工程(…...
gitignore忽略已经提交过的
已经在.gitignore文件中添加了过滤规则来忽略bin和obj等文件夹,但这些文件夹仍然出现在提交中,可能是因为这些文件夹在添加.gitignore规则之前已经被提交到Git仓库中了。要解决这个问题,您需要从Git的索引中移除这些文件夹,并确保…...