当前位置: 首页 > news >正文

Flink四大基石之窗口(Window)使用详解

目录

一、引言

二、为什么需要 Window

三、Window 的控制属性

窗口的长度(大小)

窗口的间隔

四、Flink 窗口应用代码结构

是否分组

Keyed Window --键控窗

Non-Keyed Window

核心操作流程

五、Window 的生命周期

分配阶段

触发计算

六、Window 的分类

滚动窗口- TumblingWindow概念

滑动窗口– SlidingWindow概念

会话窗口 [了解]

七、Windows Function 窗口函数

分类剖析

增量聚合函数(以 AggregateFunction 为例)

全量聚合函数

八、案例实战

案例一

滚动窗口演示        

滑动窗口演示

热词统计案例

kafka发送消息的模板代码

九、总结


        本文深入探讨 Flink 中高级 API 里窗口(Window)的相关知识,涵盖为什么需要窗口、其控制属性、应用代码结构、生命周期、分类,以及窗口函数的各类细节,并辅以实例进行讲解,旨在助力开发者透彻理解并熟练运用 Flink 的窗口机制处理流数据。

一、引言

        在大数据实时处理领域,Apache Flink 凭借其卓越性能与丰富功能占据重要地位。而窗口(Window)作为 Flink 从流处理(Streaming)到批处理(Batch)的关键桥梁,理解与掌握其使用对高效数据处理意义非凡,接下来将全方位剖析其奥秘。

二、为什么需要 Window

        在流处理场景中,数据如潺潺溪流般持续涌入、无休无止。但诸多业务场景要求我们对特定时段数据做聚合操作,像统计 “过去的 1 分钟内有多少用户点击了我们的网页”。若不划定范围,面对无尽数据洪流,根本无法开展有针对性计算。窗口恰似神奇 “箩筐”,按规则收集一定时长或一定数据量数据,将无限流拆分成有限 “桶”,便于精准计算,满足如 “每隔 10min,计算最近 24h 的热搜词” 这类实时需求。

三、Window 的控制属性

窗口的长度(大小)

        明确要计算最近多久的数据,以时间维度举例,若关注 24 小时内热搜词数据量,那 24 小时即窗口长度;计数维度下,设定统计前 N 条数据,N 就是计数窗口的长度规格。

窗口的间隔

        决定隔多久进行一次计算操作。像 “每隔 10min,计算最近 24h 的热搜词” 里,每隔 10 分钟便是间隔设定,它把控着计算频次节奏。

四、Flink 窗口应用代码结构

是否分组

        首先要判定是否依 Key 对 DataStream 分组,经 keyBy 操作后,数据流成多组,下游算子多实例可并行跑,提效显著;若用 windowAll 则不分组,所有数据送下游单个实例(并行度为 1),后续窗口操作逻辑与分组情形(Keyed Window)类似,仅执行主体有别。

Keyed Window --键控窗

// Keyed Window
stream.keyBy(...)              <-  按照一个Key进行分组.window(...)            <-  将数据流中的元素分配到相应的窗口中[.trigger(...)]            <-  指定触发器Trigger(可选)[.evictor(...)]            <-  指定清除器Evictor(可选).reduce/aggregate/process/apply()      <-  窗口处理函数Window Function

Non-Keyed Window

// Non-Keyed Window
stream.windowAll(...)         <-  不分组,将数据流中的所有元素分配到相应的窗口中[.trigger(...)]            <-  指定触发器Trigger(可选)[.evictor(...)]            <-  指定清除器Evictor(可选).reduce/aggregate/process()      <-  窗口处理函数Window Function

核心操作流程

        借助窗口分配器(WindowAssigner)依时间(Event Time 或 Processing Time)把数据流元素 “分拣” 进对应窗口;待满足触发条件(常是窗口结束时间到等情况),用窗口处理函数(如 reduce、aggregate、process 等常用函数)处理窗口内数据,此外,trigger、evictor 是面向高级自定义需求的触发、销毁附加项,默认配置也能应对常见场景。

五、Window 的生命周期

分配阶段

        窗口分配器依据设定规则(像按时间间隔、计数规则等),为流入数据 “找家”,安置到合适窗口 “桶” 内,确定数据归属,构建基础计算单元。

触发计算

        当预设触发条件达成,如时间窗口到结束点,对应窗口函数 “登场”,对窗口内数据按既定逻辑聚合处理,不同窗口函数(reduce、aggregate、process)处理细节、能力有差异,像 process 更底层、功能更强大,自带 open/close 生命周期方法且能获取 RuntimeContext。

        上图是窗口的生命周期示意图,假如我们设置的是一个10分钟的滚动窗口,第一个窗口的起始时间是0:00,结束时间是0:10,后面以此类推。当数据流中的元素流入后,窗口分配器会根据时间(Event Time或Processing Time)分配给相应的窗口。相应窗口满足了触发条件,比如已经到了窗口的结束时间,会触发相应的Window Function进行计算。注意,本图只是一个大致示意图,不同的Window Function的处理方式略有不同。

 

        从数据类型上来看,一个DataStream经过keyBy转换成KeyedStream,再经过window转换成WindowedStream,我们要在之上进行reduce、aggregate或process等Window Function,对数据进行必要的聚合操作。

六、Window 的分类

Window可以分成两类:

CountWindow按指定数据条数生成窗口,与时间脱钩。

滚动计数窗口:每隔 N 条数据,聚焦统计前 N 条,如每来 10 条统计前 10 条信息。

滑动计数窗口:每隔 N 条数据,统计前 M 条(N≠M),像每过 20 条统计前 15 条情况。

TimeWindow(重点):基于时间划定窗口。

滚动时间窗口:每隔 N 时间,统计前 N 时间范围数据,如每隔 5 分钟统计前 5 分钟车辆通过量,窗口长度与滑动距离均为 5 分钟。

滑动时间窗口:每隔 N 时间,统计前 M 时间范围数据(M≠N),像每隔 30 秒统计前 1 分钟车辆数据,窗口长度 1 分钟、滑动距离 30 秒。

会话窗口:设会话超时时间(如 10 分钟),期间无数据来则结算上一窗口数据,按毫秒精细界定范围,与 Key 值关联紧密,Key 值无新输入达设定时长就统计,不受全局新数据流入干扰。

滚动窗口- TumblingWindow概念

流是连续的,无界的(有明确的开始,无明确的结束

假设有个红绿灯,提出个问题:计算一下通过这个路口的汽车数量

对于这个问题,肯定是无法回答的,为何?

因为,统计是一种对固定数据进行计算的动作。

因为流的数据是源源不断的,无法满足固定数据的要求(因为不知道何时结束)

那么,我们换个问题:统计1分钟内通过的汽车数量

那么,对于这个问题,我们就可以解答了。因为这个问题确定了数据的边界,从无界的流数据中,取出了一部分有边界的数据子集合进行计算。

描述完整就是:每隔1分钟,统计这1分钟内通过汽车的数量。窗口长度是1分钟,时间间隔是1分钟,所以这样的窗口就是滚动窗口。

那么,这个行为或者说这个统计的数据边界,就称之为窗口。

同时,我们的问题,是以时间来划分被处理的数据边界的,那么按照时间划分边界的就称之为:时间窗口

反之,如果换个问题,统计100辆通过的车里面有多少宝马品牌,那么这个边界的划分就是按照数量的,这样的称之为:计数窗口

同时,这样的窗口被称之为滚动窗口,按照窗口划分依据分为:滚动时间窗口、滚动计数窗口。

滑动窗口– SlidingWindow概念

同样是需求,改为:

每隔1分钟,统计前面2分钟内通过的车辆数

对于这个需求我们可以看出,窗口长度是2分钟,每隔1分钟统计一次,窗口长度和时间间隔不相等,并且是大于关系,就是滑动窗口

或者:每通过100辆车,统计前面通过的50辆车的品牌占比

对于这个需求可以看出,窗口长度是50辆车,但是每隔100辆车统计一次

对于这样的窗口,我们称之为滑动窗口。

那么在这里面,统计多少数据是窗口长度(如统计2分钟内的数据,统计50辆车中的数据)

隔多久统计一次称之为滑动距离(如,每隔1分钟,每隔100辆车)

那么可以看出,滑动窗口,就是滑动距离不等于窗口长度的一种窗口

比如,每隔1分钟 统计先前5分钟的数据,窗口长度5分钟,滑动距离1分钟,不相等

比如,每隔100条数据,统计先前50条数据,窗口长度50条,滑动距离100条,不相等

那如果相等呢?相等就是比如:每隔1分钟统计前面1分钟的数据,窗口长度1分钟,滑动距离1分钟,相等。

对于这样的需求可以简化成:每隔1分钟统计一次数据,这就是前面说的滚动窗口

那么,我们可以看出:

滚动窗口:窗口长度= 滑动距离

滑动窗口:窗口长度!= 滑动距离

总结:其中可以发现,对于滑动窗口:

滑动距离> 窗口长度,会漏掉数据,比如:每隔5分钟,统计前面1分钟的数据(滑动距离5分钟,窗口长度1分钟,漏掉4分钟的数据)这样的东西,没人用。

滑动距离< 窗口长度,会重复处理数据,比如:每隔1分钟,统计前面5分钟的数据(滑动距离1分钟,窗口长度5分钟,重复处理4分钟的数据)

滑动距离= 窗口长度,不漏也不会重复,也就是滚动窗口

窗口的长度(大小) > 窗口的间隔 : 如每隔5s, 计算最近10s的数据 【滑动窗口】

窗口的长度(大小) = 窗口的间隔: 如每隔10s,计算最近10s的数据 【滚动窗口】

窗口的长度(大小) < 窗口的间隔: 每隔15s,计算最近10s的数据 【没有名字,不用】

会话窗口 [了解]

Session 会话,一次会话。就是谈话。

设置一个会话超时时间间隔即可, 如10分钟,那么表示:

如果10分钟没有数据到来, 就计算上一个窗口的数据

代码中,并行度设置为1,测试比较 方便。

窗口的范围:

窗口的判断是按照毫秒为单位

如果窗口长度是5秒

窗口的开始: start

窗口的结束: start + 窗口长度 -1 毫秒

比如窗口长度是5秒, 从0开始

那么窗口结束是: 0 + 5000 -1 = 4999

七、Windows Function 窗口函数

分类剖析

全量函数:耐心缓存窗口所有元素,直至触发条件成熟,才对全量数据 “开刀” 计算,此特性可满足数据排序等复杂需求。

增量函数:保存中间数据 “蓝本”,新元素流入就与之融合更新,持续迭代中间成果,高效且灵活。

增量聚合函数(以 AggregateFunction 为例)

        每有新数据 “入局”,立马按规则计算,其接口含输入类型(IN)、累加器类型(ACC)、输出类型(OUT)参数,有对应 add、createAccumulator、merge、extractOutput 等方法,构建严谨聚合流程。

实现方法(常见的增量聚合函数如下):
reduce(reduceFunction)
aggregate(aggregateFunction)
sum()
min()
max()

reduce接受两个相同类型的输入,生成一个同类型输出,所以泛型就一个 <T>
maxBy、minBy、sum这3个底层都是由reduce实现的
aggregate的输入值、中间结果值、输出值它们3个类型可以各不相同,泛型有<T, ACC, R>

AggregateFunction 【了解】

AggregateFunction 比 ReduceFunction 更加的通用,它有三个参数:输入类型(IN)、累加器类型(ACC)和输出类型(OUT)

输入类型是输入流中的元素类型,AggregateFunction有一个add方法可以将一个输入元素添加到一个累加器中。该接口还具有创建初始累加器(createAccumulator方法)、将两个累加器合并到一个累加器(merge方法)以及从累加器中提取输出(类型为OUT)的方法。

package com.bigdata.windows;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class _04_AggDemo {public static final Tuple3[] ENGLISH = new Tuple3[] {Tuple3.of("class1", "张三", 100L),Tuple3.of("class1", "李四", 40L),Tuple3.of("class1", "王五", 60L),Tuple3.of("class2", "赵六", 20L),Tuple3.of("class2", "小七", 30L),Tuple3.of("class2", "小八", 50L)};public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);env.setParallelism(1);//2. source-加载数据DataStreamSource<Tuple3<String,String,Long>> dataStreamSource = env.fromElements(ENGLISH);KeyedStream<Tuple3<String,String,Long>, String> keyedStream = dataStreamSource.keyBy(new KeySelector<Tuple3<String,String,Long>, String>() {@Overridepublic String getKey(Tuple3<String,String,Long> tuple3) throws Exception {return tuple3.f0;}});//3. transformation-数据处理转换// 三个参数:输入类型(IN)、累加器类型(ACC)和输出类型(OUT)keyedStream.countWindow(3).aggregate(new AggregateFunction<Tuple3<String,String,Long>, Tuple3<String,Long,Integer>, Tuple2<String,Double>>() {// 初始化一个中间变量Tuple3<String,Long,Integer> tuple3 = Tuple3.of(null,0L,0);@Overridepublic Tuple3<String,Long,Integer> createAccumulator() {return tuple3;}@Overridepublic Tuple3<String,Long,Integer> add(Tuple3<String, String, Long> value, Tuple3<String,Long,Integer> accumulator) {long tempScore = value.f2 + accumulator.f1;int length = accumulator.f2 + 1;return Tuple3.of(value.f0, tempScore,length);}@Overridepublic Tuple2<String, Double> getResult( Tuple3<String,Long,Integer> accumulator) {return Tuple2.of(accumulator.f0,(double) accumulator.f1 / accumulator.f2);}@Overridepublic Tuple3<String, Long, Integer> merge(Tuple3<String, Long, Integer> a, Tuple3<String, Long, Integer> b) {return Tuple3.of(a.f0,a.f1+b.f1,a.f2+b.f2);}}).print();//4. sink-数据输出//5. execute-执行env.execute();}
}

全量聚合函数

        坚守等窗口数据集齐 “发令枪响” 才运算原则,确保计算基于完整数据集,保障结果准确性、完整性,契合多场景聚合诉求。

实现方法
apply(windowFunction)
process(processWindowFunction)

全量聚合: 窗口需要维护全部原始数据,窗口触发进行全量聚合。
ProcessWindowFunction一次性迭代整个窗口里的所有元素,比较重要的一个对象是Context,可以获取到事件和状态信息,这样我们就可以实现更加灵活的控制,该算子会浪费很多性能,主要原因是不增量计算,要缓存整个窗口然后再去处理,所以要设计好内存。

package com.bigdata.day04;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.util.Collector;public class Demo03 {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2. source-加载数据Tuple3[] ENGLISH = new Tuple3[] {Tuple3.of("class1", "张三", 100L),Tuple3.of("class1", "李四", 40L),Tuple3.of("class1", "王五", 60L),Tuple3.of("class2", "赵六", 20L),Tuple3.of("class2", "小七", 30L),Tuple3.of("class2", "小八", 50L)};// 先求每个班级的总分数,再求每个班级的总人数DataStreamSource<Tuple3<String,String,Long>> streamSource = env.fromElements(ENGLISH);KeyedStream<Tuple3<String, String, Long>, String> keyedStream = streamSource.keyBy(v -> v.f0);// 每个分区中的数据都达到了3条才能触发,哪个分区达到了三条,哪个就触发,不够的不计算// //Tuple3<String, String, Long> 输入类型//    //Tuple2<Long, Long> 累加器ACC类型,保存中间状态 第一个值代表总成绩,第二个值代表总人数//    //Double 输出类型// 第一个泛型是输入数据的类型,第二个泛型是返回值类型   第三个是key 的类型, 第四个是窗口对象keyedStream.countWindow(3).apply(new WindowFunction<Tuple3<String, String, Long>, Double, String, GlobalWindow>() {@Overridepublic void apply(String s, GlobalWindow window, Iterable<Tuple3<String, String, Long>> input, Collector<Double> out) throws Exception {// 计算总成绩,计算总人数int sumScore = 0,sumPerson=0;for (Tuple3<String, String, Long> tuple3 : input) {sumScore += tuple3.f2;sumPerson += 1;}out.collect((double)sumScore/sumPerson);}}).print();//5. execute-执行env.execute();}
}

八、案例实战

案例一

 需求为 “每 5 秒钟统计一次,最近 5 秒钟内,各个路口通过红绿灯汽车的数量”,借 Flink 代码实现,底层算法作用下,数据按节奏聚合统计,时间设 1 分钟更易观察效果,能清晰看到各时段车辆数统计产出。

nc -lk 9999
有如下数据表示:
信号灯编号和通过该信号灯的车的数量
9,3
9,2
9,7
4,9
2,6
1,5
2,3
5,7
5,4

没有添加窗口的写法:

package com.bigdata.day03.time;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class Demo07 {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2. source-加载数据DataStreamSource<String> streamSource = env.socketTextStream("localhost", 9999);// 9,2 --> (9,2)//3. transformation-数据处理转换streamSource.map(new MapFunction<String, Tuple2<Integer,Integer>>() {@Overridepublic Tuple2<Integer, Integer> map(String line) throws Exception {String[] arr = line.split(",");int monitor_id = Integer.valueOf(arr[0]);int num = Integer.valueOf(arr[1]);return Tuple2.of(monitor_id,num);}}).keyBy(tuple->tuple.f0).sum(1).print();//4. sink-数据输出//5. execute-执行env.execute();}
}

此处的sum求和,中count ,其实是CartInfo中的一个字段而已。

演示:

滚动窗口演示
        

package com.bigdata.day03.time;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;public class Demo08 {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2. source-加载数据DataStreamSource<String> streamSource = env.socketTextStream("localhost", 9999);// 9,2 --> (9,2)//3. transformation-数据处理转换streamSource.map(new MapFunction<String, Tuple2<Integer,Integer>>() {@Overridepublic Tuple2<Integer, Integer> map(String line) throws Exception {String[] arr = line.split(",");int monitor_id = Integer.valueOf(arr[0]);int num = Integer.valueOf(arr[1]);return Tuple2.of(monitor_id,num);}}).keyBy(tuple->tuple.f0).window(TumblingProcessingTimeWindows.of(Time.minutes(1))).sum(1).print();//4. sink-数据输出//5. execute-执行env.execute();}
}

        以上代码的时间最好修改为1分钟,假如时间间隔是1分钟,那么48分03秒时输入的信号灯数据,49分整点会统计出来结果,原因是底层有一个算法。

        滑动窗口的话,不太容易看到效果,因为有些数据被算到了多个窗口中,需要我们拿笔自己计算一下,对比一下:

滑动窗口演示

        同样统计各路口汽车数量,但需求改为 “每 5 秒钟统计一次,最近 10 秒钟内”,因数据会在多窗口重复计算,需手动比对梳理,深入体会滑动窗口数据处理逻辑与特点。

package com.bigdata.day03.time;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;public class Demo09 {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2. source-加载数据DataStreamSource<String> streamSource = env.socketTextStream("localhost", 9999);// 9,2 --> (9,2)//3. transformation-数据处理转换streamSource.map(new MapFunction<String, Tuple2<Integer,Integer>>() {@Overridepublic Tuple2<Integer, Integer> map(String line) throws Exception {String[] arr = line.split(",");int monitor_id = Integer.valueOf(arr[0]);int num = Integer.valueOf(arr[1]);return Tuple2.of(monitor_id,num);}}).keyBy(tuple->tuple.f0).window(SlidingProcessingTimeWindows.of(Time.seconds(10),Time.seconds(5))).sum(1).print();//4. sink-数据输出//5. execute-执行env.execute();}
}

热词统计案例

        借助 Kafka 随机发送 50000 个热词(200 毫秒间隔),分别基于滚动、滑动窗口统计,编写 Flink 代码时着重体会 apply 方法,兼顾二者效果差异,同时知晓工作中 process 函数因更强大底层能力常成首选。

apply和process都是处理全量计算,但工作中正常用process。

process更加底层,更加强大,有open/close生命周期方法,又可获取RuntimeContext。

package com.bigdata.day03.time;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Properties;public class Demo10 {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2. source-加载数据Properties properties = new Properties();properties.setProperty("bootstrap.servers","bigdata01:9092");properties.setProperty("group.id","g2");//2. source-加载数据FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<String>("flink-01",new SimpleStringSchema(),properties);DataStreamSource<String> kafkaSource = env.addSource(kafkaConsumer);//3. transformation-数据处理转换kafkaSource.map(new MapFunction<String, Tuple2<String,Integer>>() {@Overridepublic Tuple2<String, Integer> map(String value) throws Exception {return Tuple2.of(value,1);}}).keyBy(tuple->tuple.f0)//.window(TumblingProcessingTimeWindows.of(Time.seconds(5))).window(SlidingProcessingTimeWindows.of(Time.seconds(5),Time.seconds(2)))// 第一个泛型是输入数据的类型,第二个泛型是返回值类型   第三个是key 的类型, 第四个是窗口对象.apply(new WindowFunction<Tuple2<String, Integer>, String, String, TimeWindow>() {@Overridepublic void apply(String key,  // 代表分组key值     五旬老太守国门TimeWindow window, // 代表窗口对象Iterable<Tuple2<String, Integer>> input, // 分组过之后的数据 [1,1,1,1,1]Collector<String> out  // 用于输出的对象) throws Exception {SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");long start = window.getStart();long end = window.getEnd();int sum = 0;for (Tuple2<String, Integer> tuple2 : input) {sum += tuple2.f1;}out.collect(key+",窗口开始:"+dateFormat.format(new Date(start))+",结束时间:"+dateFormat.format(new Date(end))+","+sum);//out.collect(key+",窗口开始:"+start+",结束时间:"+end+","+sum);}}).print();//4. sink-数据输出//5. execute-执行env.execute();}
}

kafka发送消息的模板代码

package com.bigdata.day03.time;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;
import java.util.Random;public class CustomProducer {public static void main(String[] args) {// Properties 它是map的一种Properties properties = new Properties();// 设置连接kafka集群的ip和端口properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"bigdata01:9092");properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");// 创建了一个消息生产者对象KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);// 调用这个里面的send方法String[] hotWords= new String[]{"郭有才","歌手2024","五旬老太守国门","师夷长技以制夷"};Random random = new Random();for (int i = 0; i < 50000; i++) {String word = hotWords[random.nextInt(4)];ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>("flink-01",word);kafkaProducer.send(producerRecord);}kafkaProducer.close();}
}

九、总结

        Flink 窗口机制犹如精密仪器,从控制属性、分类设计到函数运用,各环节紧密相扣。深入理解其原理、熟练实操代码,能为实时流数据处理注入强大动力,解锁更多高效、智能数据聚合分析场景,助力开发者在大数据浪潮中稳立潮头、驾驭数据。后续可深入探索自定义窗口逻辑、优化性能调优等进阶方向,深挖 Flink 窗口潜力。

相关文章:

Flink四大基石之窗口(Window)使用详解

目录 一、引言 二、为什么需要 Window 三、Window 的控制属性 窗口的长度&#xff08;大小&#xff09; 窗口的间隔 四、Flink 窗口应用代码结构 是否分组 Keyed Window --键控窗 Non-Keyed Window 核心操作流程 五、Window 的生命周期 分配阶段 触发计算 六、Wi…...

Easy Excel 通过【自定义批注拦截器】实现导出的【批注】功能

目录 Easy Excel 通过 【自定义批注拦截器】实现导出的【批注】功能需求原型&#xff1a;相关数据&#xff1a;要导出的对象字段postman 格式导出对象VO 自定义批注拦截器业务代码&#xff1a; 拦截器代码解释&#xff1a;详细解释&#xff1a;格式优化&#xff1a; Easy Excel…...

PHP 去掉特殊不可见字符 “\u200e“

描述 最近在排查网站业务时&#xff0c;发现有数据匹配失败的情况 肉眼上完全看不出问题所在 当把字符串 【M24308/23-14F‎】复制出来发现 末尾有个不可见的字符 使用删除键或左右移动时才会发现 最后测试通过 var_dump 打印 发现这个"空字符"占了三个长度 &#xf…...

Flume和kafka的整合:使用Flume将日志数据抽取到Kafka中

文章目录 1、Kafka作为Source【数据进入到kafka中&#xff0c;抽取出来】2、kafka作为Sink 【数据从别的地方抽取到kafka里面】 1、Kafka作为Source【数据进入到kafka中&#xff0c;抽取出来】 kafka源 --> memory --> 控制台&#xff1a; a1.sources r1 a1.sinks k1…...

Flutter:启动屏逻辑处理02:启动页

启动屏启动之后&#xff0c;制作一个启动页面 新建splash&#xff1a;view 视图中只有一张图片sliding.png就是我们的启动图 import package:flutter/material.dart; import package:get/get.dart; import index.dart; class SplashPage extends GetView<SplashController…...

【MySQL】自动刷新flush privileges命令

在 MySQL 中&#xff0c;执行 FLUSH PRIVILEGES 命令的主要作用是使权限表中的更改立即生效。下面是关于这个命令的一些关键点&#xff1a; 1. 什么是 FLUSH PRIVILEGES 当你使用 SET PASSWORD 或其他 SQL 语句直接修改了用户的密码或权限&#xff08;例如&#xff0c;使用 U…...

2024免费天气接口(无废话版)

免费接口1&#xff1a;http://t.weather.sojson.com/api/weather/city/101030100 免费接口2&#xff1a;http://t.weather.itboy.net/api/weather/city/101030100 至于后面那个城市编码 请自行查询&#xff1a;如图 注意&#xff01;&#xff01;&#xff01; 点击下载时 可能…...

fpga 时序分析基础

目录 触发器的动态参数 同步时序电路分析 1. 时钟脉冲的特性 2. 同步时序电路分析 Timing Analyzer的应用 异步时序与亚稳态问题 时序分析就是对时序电路进行时序检查&#xff0c;通过分析电路中所有寄存器之间的路径延迟以检查电路的传输延迟是否会导致触发器的建立时间…...

Laravel8.5+微信小程序实现京东商城秒杀方案

一、商品秒杀涉及的知识点 鉴权策略封装掊口访问频次限制小程序设计页面防抖接口调用订单创建事务使用超卖防御 二、订单库存系统方案&#xff08;3种&#xff09; 下单减库存 优点是库存和订单的强一致性&#xff0c;商品不会卖超&#xff0c;但是可能导致恶意下单&#xff…...

Git——本地仓库链接并推送到多个远程仓库

步骤 1. 新建仓库init 或 删除已有仓库远程链接 // 1.新建init git init// 2.已有仓库&#xff0c;查看链接的远程仓库 git remote -v// 3.已有远程连接仓库&#xff0c;需要删除连接 git remote rm origin(或对应远程仓库名) 2.新建远程仓库 在gitee、github等托管平台创建…...

llama-factory 系列教程 (七),Qwen2.5-7B-Instruct 模型微调与vllm部署详细流程实战

文章目录 介绍llama-factory 安装装包下载模型 微调模型数据集训练模型 微调后的模型推理 介绍 时隔已久的 llama-factory 系列教程更新了。本篇文章是第七篇&#xff0c;之前的六篇&#xff0c;大家酌情选看即可。 因为llama-factory进行了更新&#xff0c;我前面几篇文章的实…...

Spring Boot的理解

一、什么是Spring Boot? Spring Boot是一个用于构建基于Spring框架的应用程序的开源框架。它简化了Spring应用程序的开发过程&#xff0c;使开发者能够更容易地创建独立运行的、生产级别的Spring应用程序。Spring Boot提供了许多功能和约定&#xff0c;可以帮助开发者快速搭建…...

QT QFormLayout控件 全面详解

本系列文章全面的介绍了QT中的57种控件的使用方法以及示例,包括 Button(PushButton、toolButton、radioButton、checkBox、commandLinkButton、buttonBox)、Layouts(verticalLayout、horizontalLayout、gridLayout、formLayout)、Spacers(verticalSpacer、horizontalSpacer)、…...

系统性能定时监控PythonLinux

系统性能定时监控 1.系统监控概述 ⽤Python来编写脚本简化⽇常的运维⼯作是Python的⼀个重要⽤途。在Linux下&#xff0c;有许多系统命令可以让我们时刻监控系统运⾏的状态&#xff0c;如 ps &#xff0c; top &#xff0c; free 等等。要获取这些系统信息&#xff0c;Python…...

python学习——列表的相关操作

在 Python 中&#xff0c;列表&#xff08;list&#xff09;是一种非常灵活的数据结构&#xff0c;可以用来存储一系列的元素。以下是列表的一些常见操作&#xff1a; 文章目录 创建列表访问元素修改元素列表切片添加元素删除元素列表推导式其他操作pop基本用法指定索引使用场…...

HTML CSS 魔法秀:打造翻转卡片登录注册页面

这段 HTML 和 CSS 代码创建了一个具有翻转卡片效果的登录和注册页面。下面是对重点标签和 CSS 样式的解释和总结&#xff1a; 一键复制 <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8"><meta http-equiv"…...

Web day04 SpringBoot

目录 1.Spring概念&#xff1a; 2. spring程序快速入门&#xff1a; 3.HTTP协议&#xff1a; 特点&#xff1a; 基于TCP 协议&#xff1a; 基于请求响应模型&#xff1a; HTTP协议是无状态协议&#xff1a; 请求协议&#xff1a;为浏览器向服务器发出的消息 获取请求数据…...

selinux和防火墙实验

1 、 selinux 的说明 SELinux 是 Security-Enhanced Linux 的缩写&#xff0c;意思是安全强化的 linux 。 SELinux 主要由美国国家安全局&#xff08; NSA &#xff09;开发&#xff0c;当初开发的目的是为了避免资源的误用。 系统资源都是通过程序进行访问的&#xff0c;如…...

ClamAV 在 CentOS 的开发版本 `clamav-devel`

是的&#xff0c;ClamAV 在 CentOS 上有开发版本&#xff08;通常称为 clamav-devel&#xff09;&#xff0c;它包含了开发 ClamAV 应用程序所需的头文件和库文件。以下是如何在 CentOS 上安装 ClamAV 及其开发版本的步骤。 ### 1. **安装 EPEL 仓库** ClamAV 通常在 EPEL&am…...

C++《二叉搜索树》

在初阶数据结构中我学习了树基础的概念以及了解了顺序结构的二叉树——堆和链式结构二叉树该如何实现&#xff0c;那么接下来我们将进一步的学习二叉树&#xff0c;在此会先后学习到二叉搜索树、AVL树、红黑树&#xff1b;通过这些的学习将让我们更易于理解后面set、map、哈希等…...

⭐️ GitHub Star 数量前十的工作流项目

文章开始前&#xff0c;我们先做个小调查&#xff1a;在日常工作中&#xff0c;你会使用自动化工作流工具吗&#xff1f;&#x1f64b; 事实上&#xff0c;工作流工具已经变成了提升效率的关键。其实在此之前我们已经写过一篇博客&#xff0c;跟大家分享五个好用的工作流工具。…...

uni-app中的样式尺寸单位,px,rpx,vh,vw

uni-app 支持less、sass、scss、stylus等预处理器。 尺寸单位 uni-app 支持的通用 css 单位包括 px、rpx px 即屏幕像素rpx 即响应式 px&#xff0c;一种根据屏幕宽度自适应的动态单位。以 750 宽的屏幕为基准&#xff0c;**750rpx 恰好为屏幕宽度。**屏幕变宽&#xff0c;r…...

跳表(Skip List)

跳表&#xff08;Skip List&#xff09; 跳表是一种用于快速查找、插入和删除的概率型数据结构&#xff0c;通常用于替代平衡二叉搜索树&#xff08;如 AVL 树或红黑树&#xff09;。跳表通过在有序链表的基础上增加多层索引&#xff0c;使得查找操作的平均时间复杂度降低&…...

103.【C语言】数据结构之建堆的时间复杂度分析

1.向下调整的时间复杂度 推导 设树高为h 发现如下规律 按最坏的情况考虑(即调整次数最多) 第1层,有个节点,最多向上调整h-1次 第2层,有个节点,最多向上调整h-2次 第3层,有个节点,最多向上调整h-3次 第4层,有个节点,最多向上调整h-4次 ... 第h-1层,有个节点,最多向上调整1次 第…...

数字信号处理实验报告四:IIR数字滤波器设计及软件实现

1.实验目的 (1)熟悉用双线性变换法设计IIR数字滤波器的原理与方法; (2)学会调用MATLAB信号处理工具箱中滤波器设计函数(或滤波器设计分析工具fdatool)设计各种IIR数字滤波器,学会根据滤波需求确定滤波器指标参数。 (3)掌握IIR数字滤波器的MATLAB实现方法。 (3)…...

Flutter:encrypt插件 AES加密处理

1、pubspec.yaml导入插件 cupertino_icons: ^1.0.8 # 密码加密 encrypt: 5.0.3encrypt封装 import package:encrypt/encrypt.dart; /// 加密类 class EncryptUtil {static final EncryptUtil _instance EncryptUtil._internal();factory EncryptUtil() > _instance;Encrypt…...

软银集团孙正义再度加码OpenAI,近屿智能专注AI人才培养

11月28日凌晨&#xff0c;全球最大财经CNBC报道&#xff0c;软银集团创始人兼CEO孙正义再次向人工智能领域的领军企业OpenAI投资了15亿美元。软银对OpenAI的投资已不是首次。就在上个月&#xff0c;软银已在OpenAI的上一轮融资中注入了5亿美元的资金。但他一直寻求获得OpenAI更…...

windows11下的Ubuntu(WSL)中安装界面测试ROS

症状&#xff1a;我在WSL(Ubuntu)中我自己的用户名下面安装好了ROS,输入命令行能用&#xff0c;就是不弹出窗口。 首先到windows应用商店安装Ubuntu,我这里安装的是20.04,然后安装对应的ROS&#xff08;Noetic版本&#xff09;. 然后windows安装VcXsrv. Ubuntu安装xfce4。 …...

Stable Diffusion 3详解

&#x1f33a;系列文章推荐&#x1f33a; 扩散模型系列文章正在持续的更新&#xff0c;更新节奏如下&#xff0c;先更新SD模型讲解&#xff0c;再更新相关的微调方法文章&#xff0c;敬请期待&#xff01;&#xff01;&#xff01;&#xff08;本文及其之前的文章均已更新&…...

【CSS】设置文本超出N行省略

文章目录 基本使用 这种方法主要是针对Webkit浏览器&#xff0c;因此可能在一些非Chrome浏览器中不适用。 基本使用 例如&#xff1a;设置文本超出两行显示省略号。 核心代码&#xff1a; .ellipsis-multiline {display: -webkit-box; -webkit-box-orient: vertical; /* 设置…...

Python绘画:蛋糕

Python绘画&#xff1a;蛋糕 &#x1f438; 前言 &#x1f438;&#x1f40b; 效果图 &#x1f40b;&#x1f409; 代码 &#x1f409; &#x1f335;&#x1f332;&#x1f333;&#x1f334;&#x1f33f;&#x1f340;☘️&#x1f331;&#x1f343;&#x1f38b;&#x1f…...

使用wget在清华镜像站下载Anaconda报错ERROR 403: Forbidden.

问题描述 使用wget在清华镜像站下载Anaconda报错ERROR 403: Forbidden. Resolving mirrors.tuna.tsinghua.edu.cn (mirrors.tuna.tsinghua.edu.cn)… 101.6.15.130, 2402:f000:1:400::2 Connecting to mirrors.tuna.tsinghua.edu.cn (mirrors.tuna.tsinghua.edu.cn)|101.6.15…...

道可云人工智能元宇宙每日资讯|第三届京西地区发展论坛成功召开

道可云元宇宙每日简报&#xff08;2024年11月27日&#xff09;讯&#xff0c;今日元宇宙新鲜事有&#xff1a; 工信部等十二部门印发《5G规模化应用“扬帆”行动升级方案》 11月25日&#xff0c;工业和信息化部等十二部门印发《5G规模化应用“扬帆”行动升级方案》。《方案》…...

web安全之信息收集

在信息收集中,最主要是就是收集服务器的配置信息和网站的敏感信息,其中包括域名及子域名信息,目标网站系统,CMS指纹,目标网站真实IP,开放端口等。换句话说,只要是与目标网站相关的信息,我们都应该去尽量搜集。 1.1收集域名信息 知道目标的域名之后,获取域名的注册信…...

Google Earth Engine APP(GEE) ——基于多种机器学习多源遥感不同变量组合下的森林地表生物量模型预测APP

目录 Arguments: Returns: ui.Select Arguments: Returns: ui.Chart Arguments: Returns: ui.Chart Arguments: Returns: Classifier Arguments: Returns: Classifier Arguments: Returns: Classifier 本代码的主要功能是将我们提前准备好的森林生物量样本点上传到…...

Redis开发02:redis.windows-service.conf 默认配置文件解析与注解

文件位置&#xff1a;redis安装目录下的 redis.windows-service.conf &#xff0c;存放了redis服务的相关配置&#xff0c;下面列举出默认配置的含义&#xff1a; 配置项含义bind 127.0.0.1限制 Redis 只监听本地回环地址&#xff0c;意味着只能从本地连接 Redis。protected-m…...

webrtc 3A移植以及实时处理

文章目录 前言一、交叉编译1.Pulse Audio webrtc-audio-processing2.交叉编译 二、基于alsa进行实时3A处理1.demo源码2.注意项3.效果展示 总结 前言 由于工作需要&#xff0c;硬件3A中的AEC效果实在太差&#xff0c;后面使用SpeexDSP的软3A&#xff0c;效果依旧不是很好&#…...

Android so库的编译

在没弄明白so库编译的关系前,直接看网上博主的博文,常常会觉得云里雾里的,为什么一会儿通过Android工程cmake编译,一会儿又通过NDK命令去编译。两者编译的so库有什么区别? android版第三方库编译总体思路: 对于新手小白来说搞明白上面的总体思路图很有必…...

Reachy 2,专为AI与机器人实验室打造的卓越开源双臂移动操作平台!

近期&#xff0c;花粉机器人&#xff08;POLLEN ROBOTICS&#xff09;隆重推出Reachy 2仿生机器人——下一代开源操作平台&#xff0c;为AI与机器人实验室带来理想的双臂移动操作科研平台&#xff01; Reachy 2的仿生性&#xff1a; 》拥有两个基于Maxon无刷电机的仿生7自由度…...

Jest 测试异步函数

异步编程的发展历史 异步函数,就不用我描述了,JS是单线程的,所以没有办法处理异步问题,但是可以通过其他的机制实现 回调函数 例如,我们写一个定时器,在函数fetchData中,有一个延时处理的函数,但是,你有不能等他,如果他是一年呢? 所以,我们给他一个回调函数,来等他执行完返回处…...

linux安全管理-防火墙配置

1. 开启系统防火墙 1、检查内容 检查操作系统是否开启防火墙&#xff1b; 2、配置要求 操作系统开启防火墙&#xff1b; 3、配置方法 systemctl status firewalld ##查看系统防火墙运行状态 systemctl start firewalld ##启动防火墙 systemctl restart firewalld ##重启防火墙…...

Blender 运行python脚本

Blender 运行python脚本 步骤 1&#xff1a;打开 Blender 首先&#xff0c;打开 Blender 软件。你可以从官方网站 [blender.org]( 下载最新的 Blender 版本&#xff0c;并按照安装向导进行安装。 步骤 2&#xff1a;打开“文本编辑器”面板 在 Blender 的默认布局中&#xff…...

Three.js CSS2D/CSS3D渲染器

在Three.js开发过程中&#xff0c;有时需要将 HTML 元素与 Three.js 渲染的 3D 场景相结合&#xff0c;这就需要用到 CSS2DRenderer 和 CSS3DRenderer。本文将详细介绍这两种渲染器的原理及其应用 一、CSS2DRenderer 渲染器 概述 CSS2DRenderer 渲染器用于在 3D 场景中渲染纯…...

centos7 yum install 失败,mirrorlist.centos.org连接不上

由于centos7停止支持,导致mirrorlist.centos.orgdns解析都是失效啦,yum命令没法安装程序. 换一个镜像源就好 sudo cp /etc/yum.repos.d/CentOS-Base.repo /etc/yum.repos.d/CentOS-Base.repo.bak sudo curl -o /etc/yum.repos.d/CentOS-Base.repo http://mirrors.aliyun.com/…...

BGP协议路由黑洞

一、实验环境 1、分公司与运营商AS自治系统内运行IGP路由协议OSPF、RIP或静态路由&#xff0c;AS自治系统内通过IBGP路由协议建立BGP邻居关系。 2、公司AS自治系统与运营商AS自治系统间运行EBGP路由协议。 3、通过loopback建立IBGP与EBGP邻居关系&#xff0c;发挥loopback建立…...

学习ASP.NET Core的身份认证(基于Session的身份认证1)

ASP.NET Core使用Session也可以实现身份认证&#xff0c;关于Session的介绍请见参考文献5。基于Session的身份认证大致原理就是用户验证成功后将用户信息保存到Session中&#xff0c;然后在其它控制器中从Session中获取用户信息&#xff0c;用户退出时清空Session数据。百度基于…...

《Docker Registry(镜像仓库)详解》

一、引言 在容器化技术日益普及的今天&#xff0c;Docker 已成为众多开发者和企业的首选工具。而 Docker Registry&#xff08;镜像仓库&#xff09;作为 Docker 生态系统中的重要组成部分&#xff0c;负责存储和分发 Docker 镜像。本文将深入探讨 Docker Registry 的概念、功能…...

Mybatis

1 什么是MyBatis MyBatis是一个优秀的持久层框架&#xff0c;它对JDBC操作数据库的过程进行封装&#xff0c;使开发者只需要关注 SQL 本身&#xff0c;而不需要花费精力去处理例如注册驱动、创建connection、创建statement、手动设置参数、 结果集检索等JDBC繁杂的过程代码 。…...

uniapp学习(010-3 实现H5和安卓打包上线)

零基础入门uniapp Vue3组合式API版本到咸虾米壁纸项目实战&#xff0c;开发打包微信小程序、抖音小程序、H5、安卓APP客户端等 总时长 23:40:00 共116P 此文章包含第114p-116p的内容 文章目录 H5配置文件设置开始打包上传代码 安卓设置模拟器启动设置基础配置设置图标启动界面…...

IPGuard与Ping32结合,提供企业级数据加密与防泄密解决方案,全面保障敏感数据安全

随着数字化转型的深入推进&#xff0c;企业面临着日益复杂的安全挑战。如何在确保数据流通的同时&#xff0c;保障企业的核心资产不被泄露&#xff0c;是每个企业必须面对的难题。为此&#xff0c;Ping32与IPGuard联合推出了一套全面的企业级数据加密与防泄密解决方案&#xff…...