Flink 系列之十五 - 高级概念 - 窗口
之前做过数据平台,对于实时数据采集,使用了Flink。现在想想,在数据开发平台中,Flink的身影几乎无处不在,由于之前是边用边学,总体有点混乱,借此空隙,整理一下Flink的内容,算是一个知识积累,同时也分享给大家。
注意:由于框架不同版本改造会有些使用的不同,因此本次系列中使用基本框架是 Flink-1.19.x,Flink支持多种语言,这里的所有代码都是使用java,JDK版本使用的是19。
代码参考:https://github.com/forever1986/flink-study.git
目录
- 1 窗口定义及分类
- 1.1 定义
- 1.2 窗口分类
- 1.3 计算函数
- 2 代码演示
- 2.1 reduce、aggregate&process演示
- 2.1.1 reduce方法演示
- 2.1.2 aggregate方法演示
- 2.1.3 process方法演示
- 2.1.4 结合使用
- 2.2 时间窗口演示
- 2.2.3 时间滚动窗口
- 2.2.2 时间滑动窗口
- 2.2.3 时间会话窗口
- 2.3 计数窗口演示
- 2.3.1 计数滚动窗口
- 2.3.2 计数滑动窗口
前面几章对Flink从数据输入到中间计算,最后到数据输出的整个流程讲了一遍。但是这些只不过是Flink最基本的内容。接下来需要更为深入的了解Flink的特性,这些特性就是体现Flink的优势。本章先来了解第一个高级概念:“窗口”
1 窗口定义及分类
1.1 定义
根据《官方文档》的描述:窗口是Flink处理无界流的核心。窗口将流分成有限大小的“桶”,可以对桶内的数据进行定制化的计算。这么说可能会比较难理解,下面通过图解来说明一下窗口的概念:
- 管道里的数据不断的流过来,Flink会根据规则生成一个桶来接住这些数据,桶的生成规则如下:
1)基于时间规则:一定范围时间内的数据放到一个桶
2)基于计数规则:一定数量的数据放到一个桶 - 当一开始没有任何数据过来时,Flink是不会生成桶的,而是第一条数据过来才会生成一个桶。也就是事件触发类型的。
- Flink也不是触发了就一下子生成很多桶,而是到来的那一条数据符合某些条件(时间或者数量条件)时,才会生成新的桶
1.2 窗口分类
通过前面的描述可以了解到关于窗口简单理解为一个按照一定规则接受数据的桶,可以根据不同规则、不同窗口分配器以及是否做KeyBy的维度进行分类:
1)根据是否KeyBy可分为:
- 进行KeyBy:则会将数据的key相同的分配到同一个子任务,并在子任务进行窗口计算,可以是由多个子任务计算不同key的数据。
- 不进行KeyBy:不会对数据进行分开,全部都由一个子任务进行计算。
2)根据不同规则可分为:
- 时间窗口:根据设定的的时间跨度进行划分窗口,比如设定5秒钟,则会5秒钟一个桶。
- 计数窗口:根据设定的计数总量进行划分窗口,比如设定100条数据,则会100条一个桶。
3)根据窗口分配器可分为:
-
滚动窗口:将数据按照时间或者计数方式划分为不重叠的窗口。每个窗口都有固定时间或者计数。
-
滑动窗口:将数据按照设定的时间或者计数+步长的方式划分窗口,每个窗口都有固定时间或者计数,但是窗口跟窗口之间存在一定重叠数据。
注意:这里需要注意2个点。
1)当输入前2条数据时,就已经会有一个窗口生成,因为前2条数据会与之前2条(假设的数据)构成一个窗口。那么也就意味着滑动窗口是会按照步长每2条数据创建一个窗口。
2)滑动的窗口与窗口之间是会重叠2条数据,也就是有2条数据会被共享。滚动窗口经常用于计算最近1个小时、1天的数据量这种场景
3)细心的朋友还发现,其实滚动窗口就是滑动窗口一个特例,当滑动窗口的步长等于窗口长度时,就是滚动窗口。
- 会话窗口:将数据按照规定多少时间内没有其它数据再进来,那么之前进来的数据归为一个窗口。每个窗口都没有固定时间,窗口与窗口之间数据不重复。但是这种方式只支持时间规则,不支持计数规则。
它们之间可以进行组合,并产生不同的作用。当然并非所有的都能组合,本来有223=12种,但是由于计数与会话窗口不能组合,因此只有下面10种组合结果:
类型 | 作用 | 子任务数 | 是否固定 | 窗口之间是否重叠数据 |
---|---|---|---|---|
不进行KeyBy+时间+滚动 | 按照设定的时间间隔,划分窗口 | 1个 | 时间固定,数量不固定 | 不重叠 |
不进行KeyBy+时间+滑动 | 按照设定的时间间隔+步长,划分窗口 | 1个 | 时间固定,数量不固定 | 重叠 |
不进行KeyBy+时间+会话 | 按照设定的时间间隔,超过该时间间隔没有数据进入, 则开启新的窗口 | 1个 | 时间不固定,数量不固定 | 不重叠 |
不进行KeyBy+计数+滚动 | 按照设定的计数,划分窗口 | 1个 | 数量固定,数据不固定 | 不重叠 |
不进行KeyBy+计数+滑动 | 按照设定的计数+步长,划分窗口 | 1个 | 固定计数 | 重叠 |
进行KeyBy+时间+滚动 | 按照设定的时间间隔,划分窗口 | 1或n个 | 时间固定,数量不固定 | 不重叠 |
进行KeyBy+时间+滑动 | 按照设定的时间间隔+步长,划分窗口 | 1或n个 | 时间固定,数量不固定 | 重叠 |
进行KeyBy+时间+会话 | 按照设定的时间间隔,超过该时间间隔没有数据进入, 则开启新的窗口 | 1或n个 | 时间不固定,数量不固定 | 不重叠 |
进行KeyBy+计数+滚动 | 按照设定的计数,划分窗口 | 1或n个 | 数量固定,数据不固定 | 不重叠 |
进行KeyBy+计数+滑动 | 按照设定的计数+步长,划分窗口 | 1或n个 | 固定计数 | 重叠 |
1.3 计算函数
如果要进行窗口计算,需要先通过windowAll或window方法进行开窗操作,开窗之后就可以使用其计算函数。通过拿AllWindowedStream类来举例子,可以看到有很多种计算方式,包括max、min、reduce、aggregate、process等方法。但其实只需要关注其中reduce、aggregate、process三个计算函数即可,因为其它方法都是基于这三个方法中的函数进行扩展的,下面说明一下这三个函数的内容:
1)reduce:ReduceFunction函数,输入、累加器以及输出的数据类型必须一致
reduce方法需要实现ReduceFunction函数:
/*** 输入、累加器以及输出的数据类型必须一致*/
@Public
@FunctionalInterface
public interface ReduceFunction<T> extends Function, Serializable {/*** 进行累加的reduce方法,输入、累加器以及输出的数据类型必须一致* @param value1 上一次累加的结果* @param value2 这一次进来的数据* @return 返回相同数据类型的数据*/T reduce(T value1, T value2) throws Exception;
}
2)aggregate:AggregateFunction函数,输入、累加器以及输出的数据类型可以不一致
aggregate:AggregateFunction函数:
/*** 进行累加的reduce方法,输入、累加器以及输出的数据类型必须一致* IN 输入的数据类型* ACC 累加器的数据类型* OUT 返回的数据类型*/
@PublicEvolving
public interface AggregateFunction<IN, ACC, OUT> extends Function, Serializable {/*** 创建一个累加器,也就是初始化最初开始累积的值。比如你是一个计算总数的,你初始累加器就可以是0。*/ACC createAccumulator();/*** 进行累加操作,也就是计算操作。(不要被add的名称误导了,它就是写你自己的聚合逻辑)* @param value 这一次进来的数据* @param accumulator 上一次累加的结果* @return 返回累加的数据*/ACC add(IN value, ACC accumulator);/*** 输出的最终值给下游算子*/OUT getResult(ACC accumulator);/*** 一般在会话窗口中使用,用于合并窗口与窗口结果*/ACC merge(ACC a, ACC b);
}
3)process:ProcessWindowFunction和ProcessAllWindowFunction函数,这个是自定义计算函数
ProcessWindowFunction和ProcessAllWindowFunction函数之前在《系列之十一 - Data Stream API的中间算子的底层原理及其自定义》就有讲过,但是没有代码演示,这一章会进行代码演示。ProcessWindowFunction和ProcessAllWindowFunction函数其实就是自定义聚合方式,两者区别在于是否使用过keyBy操作。他们有一个process方法要实现,该方法只有窗口结束时,才会被执行。
ProcessWindowFunction函数:
/*** 进行累加的reduce方法,输入、累加器以及输出的数据类型必须一致* IN 输入的数据类型* OUT 返回的数据类型* KEY key的类型,如果是ProcessAllWindowFunction,则没有该泛型* W 开窗类型*/
@PublicEvolving
public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window>extends AbstractRichFunction {/*** 处理聚合操作的方法* */public abstract void process(KEY key, Context context, Iterable<IN> elements, Collector<OUT> out) throws Exception;
}
2 代码演示
前面讲了很多概念,可能通过字面还是很难理解,现在通过demo的方式来说明。可能就能够理解。由于做KeyBy和不做KeyBy唯一的区别就是有多少个子任务来处理开窗操作。所以这里只演示做KeyBy的示例:
以下所有代码参考lesson08子模块
2.1 reduce、aggregate&process演示
这里先对reduce、aggregate和process三个方法进行演示,本次演示大家要关注的是看看三个方法的差异:
方法 | 实现函数 | 关键函数方法 | 特性 | 执行频率 | 是否有rich函数 |
---|---|---|---|---|---|
reduce | ReduceFunction | reduce | 输入、输出和累加器的数据类型必须一致 | 每个窗口除了第一条数据之外,每来一条数据都会执行 | 有 |
aggregate | AggregateFunction | add | 输入、输出和累加器的数据类型可以不一致 | 每个窗口每来一条数据都会执行 | 有 |
process | ProcessWindowFunction或 者ProcessAllWindowFunction | process | 输入和输出的数据类型可以不一致 | 每个窗口结束时,执行一次 | 过期(非Rich函数已经能够获取上下文) |
2.1.1 reduce方法演示
示例说明:输入的cpu值在同一个窗口中累加起来。看看reduce如何被调用
ReduceDemo类:
import com.demo.lesson08.model.ServerInfo;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;import java.time.Duration;/*** 演示reduce方法*/
public class ReduceDemo {public static void main(String[] args) throws Exception {// 1. 创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2. 读取数据DataStreamSource<String> text = env.socketTextStream("127.0.0.1", 9999);// 3. map做类型转换SingleOutputStreamOperator<ServerInfo> map = text.map(new ServerInfoMapFunction());// 4. 做keyByKeyedStream<ServerInfo, String> kyStream = map.keyBy(new KeySelectorFunction());// 5. 开窗WindowedStream<ServerInfo, String, TimeWindow> windowStream = kyStream.window(TumblingProcessingTimeWindows.of(Duration.ofSeconds(10)));// 6. 计算SingleOutputStreamOperator<ServerInfo> reduce = windowStream.reduce(new ReduceFunction<ServerInfo>() {@Overridepublic ServerInfo reduce(ServerInfo value1, ServerInfo value2) throws Exception {System.out.println("==reduce: value1="+ value1 + "value2=" + value2);value1.setCpu(value1.getCpu()+value2.getCpu());return value1;}});// 7. 打印reduce.print();// 执行env.execute();}public static class ServerInfoMapFunction implements MapFunction<String, ServerInfo> {@Overridepublic ServerInfo map(String value) throws Exception {String[] values = value.split(",");String value1 = values[0];double value2 = Double.parseDouble("0");long value3 = 0;if(values.length >= 2){try {value2 = Double.parseDouble(values[1]);}catch (Exception e){value2 = Double.parseDouble("0");}}if(values.length >= 3){try {value3 = Long.parseLong(values[2]);}catch (Exception ignored){}}return new ServerInfo(value1,value2,value3);}}public static class KeySelectorFunction implements KeySelector<ServerInfo, String> {@Overridepublic String getKey(ServerInfo value) throws Exception {// 返回第一个值,作为keyBy的分类return value.getServerId();}}
}
输入:前2条数据快速输入,第3条等一段时间后再输入
输出:
知识点:总共输入了3条数据,其中前2条数据快速输入,第3条等一段时间后再输入,结果是:reduce只打印1次,print打印2次。原因:
1)本示例中使用滚动窗口,设置时间间隔为10秒,因此第一条和第二条在第一个时间窗口,第三条在第二个时间窗口,因此print打印2次
2)reduce只输出1次,这个在《系列之八 - Data Stream API的中间算子:转换和聚合》中讲聚合类算子讲过,reduce第一条数据是不会被调用的。因此第一条数据进来时,不调用reduce;第二条数据进来时,调用reduce;第三条数据进来时,由于是新的窗口,因此它算是该窗口的第一条数据,因此也不调用reduce。
2.1.2 aggregate方法演示
示例说明:通过aggregate实现求同一个窗口下的平均cpu值
AggregateDemo类:
import com.demo.lesson08.model.ServerAvgInfo;
import com.demo.lesson08.model.ServerInfo;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;import java.time.Duration;/*** 演示Aggregate方法*/
public class AggregateDemo {public static void main(String[] args) throws Exception {// 1. 创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2. 读取数据DataStreamSource<String> text = env.socketTextStream("127.0.0.1", 9999);// 3. map做类型转换SingleOutputStreamOperator<ServerInfo> map = text.map(new ReduceDemo.ServerInfoMapFunction());// 4. 做keyByKeyedStream<ServerInfo, String> kyStream = map.keyBy(new ReduceDemo.KeySelectorFunction());// 5. 开窗WindowedStream<ServerInfo, String, TimeWindow> windowStream = kyStream.window(TumblingProcessingTimeWindows.of(Duration.ofSeconds(10)));// 6. 计算SingleOutputStreamOperator<String> reduce = windowStream.aggregate(new AggregateFunction<ServerInfo, ServerAvgInfo, String>() {@Overridepublic ServerAvgInfo createAccumulator() {// 初始值System.out.println("====createAccumulator====");return new ServerAvgInfo();}@Overridepublic ServerAvgInfo add(ServerInfo value, ServerAvgInfo accumulator) {System.out.println("====add====");// 累积cpu值以及条数accumulator.setServerId(value.getServerId());accumulator.setNum(accumulator.getNum()==null?1:accumulator.getNum()+1);accumulator.setCpuTotal(accumulator.getCpuTotal()==null?value.getCpu():accumulator.getCpuTotal()+ value.getCpu());return accumulator;}@Overridepublic String getResult(ServerAvgInfo accumulator) {System.out.println("====getResult====");// 平均cpu值return "平均cpu值: "+(accumulator.getCpuTotal()/accumulator.getNum());}@Overridepublic ServerAvgInfo merge(ServerAvgInfo a, ServerAvgInfo b) {System.out.println("====merge====");return null;}});// 7. 打印reduce.print();// 执行env.execute();}public static class ServerInfoMapFunction implements MapFunction<String, ServerInfo> {@Overridepublic ServerInfo map(String value) throws Exception {String[] values = value.split(",");String value1 = values[0];double value2 = Double.parseDouble("0");long value3 = 0;if(values.length >= 2){try {value2 = Double.parseDouble(values[1]);}catch (Exception e){value2 = Double.parseDouble("0");}}if(values.length >= 3){try {value3 = Long.parseLong(values[2]);}catch (Exception ignored){}}return new ServerInfo(value1,value2,value3);}}public static class KeySelectorFunction implements KeySelector<ServerInfo, String> {@Overridepublic String getKey(ServerInfo value) throws Exception {// 返回第一个值,作为keyBy的分类return value.getServerId();}}
}
输入:前3条数据快速输入,第4条等一段时间(10秒)后再输入
输出:
知识点:总共输入了4条数据,其中前3条数据快速输入,第4条等一段时间(10秒)后再输入。结果是:createAccumulator打印2次,add打印4次,getResult打印2次,merge打印0次,print打印2次。原因:
1)本示例使用滚动窗口,设置时间间隔为10秒,因此第一条、第二条和第三条在第一个时间窗口,第四条在第二个时间窗口,因此2个窗口print打印2次。createAccumulator和getResult也是打印2次,因此说明他们是在窗口创建和关闭时分别被调用
2)add输出4次,说明每一次数据输入,都会执行add方法。
3)支持输入、累加器以及输出都是不一样的数据类型
2.1.3 process方法演示
示例说明:通过process实现求同一个窗口下的平均cpu值
ProcessDemo类:
import com.demo.lesson08.model.ServerInfo;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.time.Duration;
import java.util.Iterator;public class ProcessDemo {public static void main(String[] args) throws Exception {// 1. 创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2. 读取数据DataStreamSource<String> text = env.socketTextStream("127.0.0.1", 9999);// 3. map做类型转换SingleOutputStreamOperator<ServerInfo> map = text.map(new ReduceDemo.ServerInfoMapFunction());// 4. 做keyByKeyedStream<ServerInfo, String> kyStream = map.keyBy(new ReduceDemo.KeySelectorFunction());// 5. 开窗WindowedStream<ServerInfo, String, TimeWindow> windowStream = kyStream.window(TumblingProcessingTimeWindows.of(Duration.ofSeconds(10)));// 6. 计算SingleOutputStreamOperator<String> reduce = windowStream.process(new ProcessWindowFunction<ServerInfo, String, String, TimeWindow>() {@Overridepublic void process(String s, ProcessWindowFunction<ServerInfo, String, String, TimeWindow>.Context context, Iterable<ServerInfo> elements, Collector<String> out) throws Exception {System.out.println("该窗口的时间:"+DateFormatUtils.ISO_8601_EXTENDED_DATETIME_TIME_ZONE_FORMAT.format(context.window().getStart())+ " - " +DateFormatUtils.ISO_8601_EXTENDED_DATETIME_TIME_ZONE_FORMAT.format(context.window().getEnd())+" 的条数=" + elements.spliterator().estimateSize());// 平均cpu值Iterator<ServerInfo> iterator = elements.iterator();double cpu = 0l;long num = 0;while (iterator.hasNext()){cpu = cpu + iterator.next().getCpu();num ++;}String result = "平均cpu值: "+(cpu/num);out.collect(result);}});// 7. 打印reduce.print();// 执行env.execute();}public static class ServerInfoMapFunction implements MapFunction<String, ServerInfo> {@Overridepublic ServerInfo map(String value) throws Exception {String[] values = value.split(",");String value1 = values[0];double value2 = Double.parseDouble("0");long value3 = 0;if(values.length >= 2){try {value2 = Double.parseDouble(values[1]);}catch (Exception e){value2 = Double.parseDouble("0");}}if(values.length >= 3){try {value3 = Long.parseLong(values[2]);}catch (Exception ignored){}}return new ServerInfo(value1,value2,value3);}}public static class KeySelectorFunction implements KeySelector<ServerInfo, String> {@Overridepublic String getKey(ServerInfo value) throws Exception {// 返回第一个值,作为keyBy的分类return value.getServerId();}}
}
输入:前3条数据快速输入,等待控制台输出process,再输入第4条数据
输出:
知识点:总共输入了4条数据,其中前3条数据快速输入,第4条等一段时间后再输入。结果是:process打印2次,print打印2次,原因:
1)本示例使用滚动窗口,设置时间间隔为10秒,因此第一条、第二条和第三条在第一个时间窗口,第四条在第二个时间窗口,因此2个窗口print打印2次。
2)process打印2次,说明每个窗口在最后的时候process才会调用一次,并且将本次窗口的数据条数都一次性计算,并没有每条数据计算。
3)支持输入和输出都是不一样的数据类型
2.1.4 结合使用
细心的朋友还能看到reduce或者aggregate方法还有另外重载的方法,可以传入2个参数。比如reduce可以传入ReduceFunction和ProcessWindowFunction,aggregate可以传入AggregateFunction和ProcessWindowFunction。其实这是由于它们各自存在一定的缺点,为了弥补缺点,可以结合使用,各取有点。
注意:使用时是会将ReduceFunction或AggregateFunction最终的数据给ProcessWindowFunction,也就是每个窗口,ProcessWindowFunction的process方法的elements只会拿到ReduceFunction或AggregateFunction累加器最后计算得到的一条数据。
示例说明:在ReduceFunction中累积CPU,然后在ProcessWindowFunction计算平均值
ReduceAndProcessDemo 类:
import com.demo.lesson08.model.ServerInfo;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.time.Duration;/*** 演示reduce方法*/
public class ReduceAndProcessDemo {public static void main(String[] args) throws Exception {// 1. 创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2. 读取数据DataStreamSource<String> text = env.socketTextStream("127.0.0.1", 9999);// 3. map做类型转换SingleOutputStreamOperator<ServerInfo> map = text.map(new ServerInfoMapFunction());// 4. 做keyByKeyedStream<ServerInfo, String> kyStream = map.keyBy(new KeySelectorFunction());// 5. 开窗WindowedStream<ServerInfo, String, TimeWindow> windowStream = kyStream.window(TumblingProcessingTimeWindows.of(Duration.ofSeconds(10)));// 6. 计算SingleOutputStreamOperator<ServerInfo> reduce = windowStream.reduce(new ReduceFunction<>() {@Overridepublic ServerInfo reduce(ServerInfo value1, ServerInfo value2) throws Exception {System.out.println("==reduce: value1=" + value1 + "value2=" + value2);value1.setCpu(value1.getCpu() + value2.getCpu());return value1;}}, new ProcessWindowFunction<>() {@Overridepublic void process(String key, ProcessWindowFunction<ServerInfo, ServerInfo, String, TimeWindow>.Context context, Iterable<ServerInfo> elements, Collector<ServerInfo> out) throws Exception {// 平均cpu值long num = elements.spliterator().estimateSize();ServerInfo serverInfo = elements.iterator().next();System.out.println("服务器id=" + key + "在窗口的时间:" + DateFormatUtils.ISO_8601_EXTENDED_DATETIME_TIME_ZONE_FORMAT.format(context.window().getStart())+ " - " + DateFormatUtils.ISO_8601_EXTENDED_DATETIME_TIME_ZONE_FORMAT.format(context.window().getEnd())+ " 的条数=" + elements.spliterator().estimateSize()+ " 错误平均cpu值: " + (serverInfo.getCpu() / num));out.collect(serverInfo);}});// 7. 打印reduce.print("最终结果");// 执行env.execute();}public static class ServerInfoMapFunction implements MapFunction<String, ServerInfo> {@Overridepublic ServerInfo map(String value) throws Exception {String[] values = value.split(",");String value1 = values[0];double value2 = Double.parseDouble("0");long value3 = 0;if(values.length >= 2){try {value2 = Double.parseDouble(values[1]);}catch (Exception e){value2 = Double.parseDouble("0");}}if(values.length >= 3){try {value3 = Long.parseLong(values[2]);}catch (Exception ignored){}}return new ServerInfo(value1,value2,value3);}}public static class KeySelectorFunction implements KeySelector<ServerInfo, String> {@Overridepublic String getKey(ServerInfo value) throws Exception {// 返回第一个值,作为keyBy的分类return value.getServerId();}}
}
输入:快速输入3条数据,保证3条数据在一个窗口
输出:
知识点:从上面输出结果可以看到,reduce被调用2次,这是因为总共输入3条数据。process被调用一次,而且可以看到平均值计算错误,在process中打印的结果显示获取到的条数是1条。由此可以看出process获取到的是reduce最后一条数据。
2.2 时间窗口演示
时间窗口的演示,只需要关注的是不同窗口的时间划分
2.2.3 时间滚动窗口
示例说明:通过process实现求同一个窗口下的平均cpu值
TumblingTimeWindowsDemo 类:
import com.demo.lesson08.model.ServerInfo;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.time.Duration;
import java.util.Iterator;/*** 时间滚动窗口示例*/
public class TumblingTimeWindowsDemo {public static void main(String[] args) throws Exception {// 1. 创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2. 读取数据DataStreamSource<String> text = env.socketTextStream("127.0.0.1", 9999);// 3. map做类型转换SingleOutputStreamOperator<ServerInfo> map = text.map(new ServerInfoMapFunction());// 4. 做keyByKeyedStream<ServerInfo, String> kyStream = map.keyBy(new KeySelectorFunction());// 5. 开窗WindowedStream<ServerInfo, String, TimeWindow> windowStream = kyStream.window(TumblingProcessingTimeWindows.of(Duration.ofSeconds(10)));// 6. 计算SingleOutputStreamOperator<String> process = windowStream.process(new ProcessWindowFunction<ServerInfo, String, String, TimeWindow>() {@Overridepublic void process(String s, ProcessWindowFunction<ServerInfo, String, String, TimeWindow>.Context context, Iterable<ServerInfo> elements, Collector<String> out) throws Exception {// 打印窗口的开始时间和结束时间System.out.println("该窗口的时间:"+DateFormatUtils.ISO_8601_EXTENDED_DATETIME_TIME_ZONE_FORMAT.format(context.window().getStart())+ " - " +DateFormatUtils.ISO_8601_EXTENDED_DATETIME_TIME_ZONE_FORMAT.format(context.window().getEnd())+" 的条数=" + elements.spliterator().estimateSize());// 平均cpu值Iterator<ServerInfo> iterator = elements.iterator();double cpu = 0l;long num = 0;while (iterator.hasNext()){cpu = cpu + iterator.next().getCpu();num ++;}String result = "平均cpu值: "+(cpu/num);out.collect(result);}});// 7. 打印process.print();// 执行env.execute();}public static class ServerInfoMapFunction implements MapFunction<String, ServerInfo> {@Overridepublic ServerInfo map(String value) throws Exception {String[] values = value.split(",");String value1 = values[0];double value2 = Double.parseDouble("0");long value3 = 0;if(values.length >= 2){try {value2 = Double.parseDouble(values[1]);}catch (Exception e){value2 = Double.parseDouble("0");}}if(values.length >= 3){try {value3 = Long.parseLong(values[2]);}catch (Exception ignored){}}return new ServerInfo(value1,value2,value3);}}public static class KeySelectorFunction implements KeySelector<ServerInfo, String> {@Overridepublic String getKey(ServerInfo value) throws Exception {// 返回第一个值,作为keyBy的分类return value.getServerId();}}
}
输入:前3条数据快速输入,等待控制台输出数据之后,再输入第4条
输出:
知识点:总共输入了4条数据,其中前3条数据快速输入,第4条等一段时间后再输入。结果是:process打印2次,说明被分为2个窗口:
1)注意窗口的开始时间和结束时间,第一个窗口是从0秒-10秒,第二个窗口是从10秒-20秒
2)这说明滚动窗口,窗口之间是不会重叠的。
3)另外注意的点是,窗口时间范围是左闭右开的,也就是0秒是属于第一个窗口,但10秒不属于第一个窗口,而是属于第二个窗口
2.2.2 时间滑动窗口
示例说明:通过process实现求同一个窗口下的平均cpu值
SlidingTimeWindowsDemo 类:
import com.demo.lesson08.model.ServerInfo;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.time.Duration;
import java.util.Iterator;/*** 时间滑动窗口示例*/
public class SlidingTimeWindowsDemo {public static void main(String[] args) throws Exception {// 1. 创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2. 读取数据DataStreamSource<String> text = env.socketTextStream("127.0.0.1", 9999);// 3. map做类型转换SingleOutputStreamOperator<ServerInfo> map = text.map(new ServerInfoMapFunction());// 4. 做keyByKeyedStream<ServerInfo, String> kyStream = map.keyBy(new KeySelectorFunction());// 5. 开窗 - 滑动窗口,间隔为10秒,步长为5秒WindowedStream<ServerInfo, String, TimeWindow> windowStream = kyStream.window(SlidingProcessingTimeWindows.of(Duration.ofSeconds(10),Duration.ofSeconds(5)));// 6. 计算SingleOutputStreamOperator<String> process = windowStream.process(new ProcessWindowFunction<ServerInfo, String, String, TimeWindow>() {@Overridepublic void process(String s, ProcessWindowFunction<ServerInfo, String, String, TimeWindow>.Context context, Iterable<ServerInfo> elements, Collector<String> out) throws Exception {// 打印窗口的开始时间和结束时间System.out.println("该窗口的时间:"+ DateFormatUtils.ISO_8601_EXTENDED_DATETIME_TIME_ZONE_FORMAT.format(context.window().getStart())+ " - " +DateFormatUtils.ISO_8601_EXTENDED_DATETIME_TIME_ZONE_FORMAT.format(context.window().getEnd())+" 的条数=" + elements.spliterator().estimateSize());// 平均cpu值Iterator<ServerInfo> iterator = elements.iterator();double cpu = 0l;long num = 0;while (iterator.hasNext()){cpu = cpu + iterator.next().getCpu();num ++;}String result = "平均cpu值: "+(cpu/num);out.collect(result);}});// 7. 打印process.print();// 执行env.execute();}public static class ServerInfoMapFunction implements MapFunction<String, ServerInfo> {@Overridepublic ServerInfo map(String value) throws Exception {String[] values = value.split(",");String value1 = values[0];double value2 = Double.parseDouble("0");long value3 = 0;if(values.length >= 2){try {value2 = Double.parseDouble(values[1]);}catch (Exception e){value2 = Double.parseDouble("0");}}if(values.length >= 3){try {value3 = Long.parseLong(values[2]);}catch (Exception ignored){}}return new ServerInfo(value1,value2,value3);}}public static class KeySelectorFunction implements KeySelector<ServerInfo, String> {@Overridepublic String getKey(ServerInfo value) throws Exception {// 返回第一个值,作为keyBy的分类return value.getServerId();}}
}
输入:先快速输入前三条,等待控制台输出之后,再输入第4条数据
输出:
知识点:从控制台可以看到process被执行了2次,说明开了2个窗口。
1)注意其每个窗口的开始时间和结束时间,比如第一个窗口是从15秒-25秒,第二个窗口是从20秒-30秒,说明在20秒-25秒是两个窗口重叠之处,也就是代码中设置的步长5秒
2)前三条数据分别被第一个窗口和第二个窗口给计算了,因此说明前3条数据输入时间落在了20秒-25秒之间。因此说明两个窗口是会重叠的。
2.2.3 时间会话窗口
示例说明:通过process实现求同一个窗口下的平均cpu值
SessionTimeWindowsDemo类:
import com.demo.lesson08.model.ServerInfo;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.time.Duration;
import java.util.Iterator;/*** 时间会话窗口演示*/
public class SessionTimeWindowsDemo {public static void main(String[] args) throws Exception {// 1. 创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2. 读取数据DataStreamSource<String> text = env.socketTextStream("127.0.0.1", 9999);// 3. map做类型转换SingleOutputStreamOperator<ServerInfo> map = text.map(new ServerInfoMapFunction());// 4. 做keyByKeyedStream<ServerInfo, String> kyStream = map.keyBy(new KeySelectorFunction());// 5. 开窗WindowedStream<ServerInfo, String, TimeWindow> windowStream = kyStream.window(// 固定间隔时间ProcessingTimeSessionWindows.withGap(Duration.ofSeconds(10))// 动态间隔时间
// ProcessingTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor<ServerInfo>(){
// @Override
// public long extract(ServerInfo element) {
// return 0;
// }
// }));// 6. 计算SingleOutputStreamOperator<String> process = windowStream.process(new ProcessWindowFunction<ServerInfo, String, String, TimeWindow>() {@Overridepublic void process(String s, ProcessWindowFunction<ServerInfo, String, String, TimeWindow>.Context context, Iterable<ServerInfo> elements, Collector<String> out) throws Exception {// 打印窗口的开始时间和结束时间System.out.println("该窗口的时间:"+ DateFormatUtils.ISO_8601_EXTENDED_DATETIME_TIME_ZONE_FORMAT.format(context.window().getStart())+ " - " +DateFormatUtils.ISO_8601_EXTENDED_DATETIME_TIME_ZONE_FORMAT.format(context.window().getEnd())+" 的条数=" + elements.spliterator().estimateSize());// 平均cpu值Iterator<ServerInfo> iterator = elements.iterator();double cpu = 0l;long num = 0;while (iterator.hasNext()){cpu = cpu + iterator.next().getCpu();num ++;}String result = "平均cpu值: "+(cpu/num);out.collect(result);}});// 7. 打印process.print();// 执行env.execute();}public static class ServerInfoMapFunction implements MapFunction<String, ServerInfo> {@Overridepublic ServerInfo map(String value) throws Exception {String[] values = value.split(",");String value1 = values[0];double value2 = Double.parseDouble("0");long value3 = 0;if(values.length >= 2){try {value2 = Double.parseDouble(values[1]);}catch (Exception e){value2 = Double.parseDouble("0");}}if(values.length >= 3){try {value3 = Long.parseLong(values[2]);}catch (Exception ignored){}}return new ServerInfo(value1,value2,value3);}}public static class KeySelectorFunction implements KeySelector<ServerInfo, String> {@Overridepublic String getKey(ServerInfo value) throws Exception {// 返回第一个值,作为keyBy的分类return value.getServerId();}}
}
输入:前面2条数据在10秒钟之内输入,第2条数据输入后的10秒钟之后,再输入第3条数据
输出:
知识点:从控制台可以看到process被执行2次,因此它是开了2个窗口
1)前面两条数据被合并为一个窗口,因此平均值是2.5,第三条数据在第二个窗口,因此平均值2.8。
2)注意一下每个窗口的开始和结束时间,都不想之前滚动或者滑动窗口一样,都是固定的
3)会话窗口还支持动态规定间隔时间,你可以使用withDynamicGap方法,实现函数接口,在输入的数据中设置某个属性为间隔时间,实现动态的会话窗口。
2.3 计数窗口演示
2.3.1 计数滚动窗口
示例说明:通过process实现求同一个窗口下的平均cpu值
TumblingCountWindowsDemo 类:
import com.demo.lesson08.method.ReduceDemo;
import com.demo.lesson08.model.ServerInfo;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.util.Collector;import java.util.Iterator;/*** 计数滚动窗口示例*/
public class TumblingCountWindowsDemo {public static void main(String[] args) throws Exception {// 1. 创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2. 读取数据DataStreamSource<String> text = env.socketTextStream("127.0.0.1", 9999);// 3. map做类型转换SingleOutputStreamOperator<ServerInfo> map = text.map(new ReduceDemo.ServerInfoMapFunction());// 4. 做keyByKeyedStream<ServerInfo, String> kyStream = map.keyBy(new ReduceDemo.KeySelectorFunction());// 5. 开窗 - 活动窗口,间隔为3条数据WindowedStream<ServerInfo, String, GlobalWindow> windowedStream = kyStream.countWindow(3);// 6. 计算 - 计算平均值SingleOutputStreamOperator<String> process = windowedStream.process(new ProcessWindowFunction<>() {@Overridepublic void process(String s, ProcessWindowFunction<ServerInfo, String, String, GlobalWindow>.Context context, Iterable<ServerInfo> elements, Collector<String> out) throws Exception {long num = elements.spliterator().estimateSize();Iterator<ServerInfo> iterator = elements.iterator();double sum = 0l;while (iterator.hasNext()) {ServerInfo next = iterator.next();sum = sum + next.getCpu();}out.collect("cpu平均值=" + (sum / num) + " 条数=" + num);}});// 7. 打印process.print();// 执行env.execute();}public static class ServerInfoMapFunction implements MapFunction<String, ServerInfo> {@Overridepublic ServerInfo map(String value) throws Exception {String[] values = value.split(",");String value1 = values[0];double value2 = Double.parseDouble("0");long value3 = 0;if(values.length >= 2){try {value2 = Double.parseDouble(values[1]);}catch (Exception e){value2 = Double.parseDouble("0");}}if(values.length >= 3){try {value3 = Long.parseLong(values[2]);}catch (Exception ignored){}}return new ServerInfo(value1,value2,value3);}}public static class KeySelectorFunction implements KeySelector<ServerInfo, String> {@Overridepublic String getKey(ServerInfo value) throws Exception {// 返回第一个值,作为keyBy的分类return value.getServerId();}}
}
输入:输入前3条,再输入后3条
输出:(平均值由于精度问题不用管)
知识点:示例中设置滚动窗口,每个窗口3条数据。输入了6条数据,process调用了2次
1)这样就知道滚动窗口是每3条会新建一个窗口,窗口之间不重叠,与代码示例中设置的一样
2.3.2 计数滑动窗口
示例说明:通过process实现求同一个窗口下的平均cpu值
SlidingCountWindowsDemo 类:
import com.demo.lesson08.method.ReduceDemo;
import com.demo.lesson08.model.ServerInfo;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.util.Collector;import java.util.Iterator;/*** 计数滑动窗口示例*/
public class SlidingCountWindowsDemo {public static void main(String[] args) throws Exception {// 1. 创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2. 读取数据DataStreamSource<String> text = env.socketTextStream("127.0.0.1", 9999);// 3. map做类型转换SingleOutputStreamOperator<ServerInfo> map = text.map(new ReduceDemo.ServerInfoMapFunction());// 4. 做keyByKeyedStream<ServerInfo, String> kyStream = map.keyBy(new ReduceDemo.KeySelectorFunction());// 5. 开窗 - 活动窗口,间隔为3条数据WindowedStream<ServerInfo, String, GlobalWindow> windowedStream = kyStream.countWindow(4,2);// 6. 计算 - 计算平均值SingleOutputStreamOperator<String> process = windowedStream.process(new ProcessWindowFunction<>() {@Overridepublic void process(String s, ProcessWindowFunction<ServerInfo, String, String, GlobalWindow>.Context context, Iterable<ServerInfo> elements, Collector<String> out) throws Exception {long num = elements.spliterator().estimateSize();Iterator<ServerInfo> iterator = elements.iterator();double sum = 0l;while (iterator.hasNext()) {sum = sum + iterator.next().getCpu();}out.collect("cpu平均值=" + (sum / num) + " 条数=" + num);}});// 7. 打印process.print();// 执行env.execute();}public static class ServerInfoMapFunction implements MapFunction<String, ServerInfo> {@Overridepublic ServerInfo map(String value) throws Exception {String[] values = value.split(",");String value1 = values[0];double value2 = Double.parseDouble("0");long value3 = 0;if(values.length >= 2){try {value2 = Double.parseDouble(values[1]);}catch (Exception e){value2 = Double.parseDouble("0");}}if(values.length >= 3){try {value3 = Long.parseLong(values[2]);}catch (Exception ignored){}}return new ServerInfo(value1,value2,value3);}}public static class KeySelectorFunction implements KeySelector<ServerInfo, String> {@Overridepublic String getKey(ServerInfo value) throws Exception {// 返回第一个值,作为keyBy的分类return value.getServerId();}}
}
输入:先输入2条,等控制台输出后再输入第3条和第4条,等控制台输出后再输入第5条和第6条
输出:
知识点:总共输入6条数据,先输入2条,等一下再输入第3条和第4条,等一下再输入第5条和第6条。process调用了3次
1)可以看出它是会按照步长来调用process,这是因为第1条和第2条会是前一个窗口与当前窗口重叠
结语:本章说明了不同的窗口类型,还通过代码演示,更深刻了解窗口的工作原理。接下来一章将接触更底层的Flink如何实现不同窗口的。
相关文章:
Flink 系列之十五 - 高级概念 - 窗口
之前做过数据平台,对于实时数据采集,使用了Flink。现在想想,在数据开发平台中,Flink的身影几乎无处不在,由于之前是边用边学,总体有点混乱,借此空隙,整理一下Flink的内容,…...
控制台打印带格式内容
1. 场景 很多软件会在控制台打印带颜色和格式的文字,需要使用转义符实现这个功能。 2. 详细说明 2.1.转义符说明 样式开始:\033[参数1;参数2;参数3m 可以多个参数叠加,若同一类型的参数(如字体颜色)设置了多个&…...
Linux为啥会重新设置中断请求号与中断向量号之间的关系?
Linux内核重新设置中断请求号(IRQ)与中断向量号之间的关系,主要出于以下核心原因和设计考量: 1. 硬件多样性与抽象需求 硬件中断号(HW Interrupt ID)的差异 不同处理器架构的中断控制器(…...
自然语言处理NLP中的连续词袋(Continuous bag of words,CBOW)方法、优势、作用和程序举例
自然语言处理NLP中的连续词袋(Continuous bag of words,CBOW)方法、优势、作用和程序举例 目录 自然语言处理NLP中的连续词袋(Continuous bag of words,CBOW)方法、优势、作用和程序举例一、连续词袋( Cont…...
计算机网络笔记(二十二)——4.4网际控制报文协议ICMP
4.4.1ICMP报文的种类 ICMP(Internet Control Message Protocol)是IP协议的辅助协议,主要用于传递控制消息、错误报告和诊断信息。其报文分为两大类:查询报文和错误报告报文。 1. 错误报告报文(Error Messages&#x…...
【AI论文】作为评判者的感知代理:评估大型语言模型中的高阶社会认知
摘要:评估大型语言模型(LLM)对人类的理解程度,而不仅仅是文本,仍然是一个开放的挑战。 为了弥合这一差距,我们引入了Sentient Agent作为评判者(SAGE),这是一个自动评估框…...
Kubernetes生产实战(二十七):精准追踪Pod数据存储位置
在生产环境中,快速定位Pod数据的物理存储位置是运维人员的基本功。本文将揭秘Kubernetes存储系统的核心原理,并提供一套经过实战检验的定位方法体系。 一、存储架构全景图 K8S存储架构 Pod --> Volume Mount --> PVC --> PV --> Storage P…...
极新携手火山引擎,共探AI时代生态共建的破局点与增长引擎
在生成式AI与行业大模型的双重驱动下,人工智能正以前所未有的速度重构互联网产业生态。从内容创作、用户交互到商业决策,AI技术渗透至产品研发、运营的全链条,推动效率跃升与创新模式变革。然而,面对AI技术迭代的爆发期࿰…...
[SIGPIPE 错误] 一个 Linux socket 程序,没有任何报错打印直接退出程序
1. 问题 在编写一个程序的时候,当然程序很复杂,遇到了一个 Linux socket 程序,没有任何报错打印直接退出程序,但是在程序里面我有很多 error log ,在程序退出的时候完全没有打印。为了说明问题,我编写了一…...
Qt 界面优化(绘图)
目录 1. 绘图基本概念2. 绘制各种形状2.1 绘制线段2.2 绘制矩形2.3 绘制圆形2.4 绘制文本2.5 设置画笔2.6 设置画刷 3. 绘制图片3.1 绘制简单图片3.2 平移图片3.3 缩放图片3.4 旋转图片 4. 其他设置4.1 移动画家位置4.2 保存/加载画家的状态 5. 特殊的绘图设备5.1 QPixmap5.2 Q…...
AQS(AbstractQueuedSynchronizer)解析
文章目录 一、AQS简介二、核心设计思想2.1 核心设计思想回顾2.2 CLH锁队列简介2.3 AQS对CLH队列的改动及其原因 三、核心组件详解3.1 state 状态变量3.2 同步队列 (FIFO双向链表) 四、核心方法深度解析4.1 获取同步状态 (独占模式) - acquire(int arg)4.2 释放同步状态 (独占模…...
Java并发编程常见问题与陷阱解析
引言 随着计算机硬件技术的飞速发展,多核处理器已经变得普遍,Java并发编程的重要性也日益凸显。然而,多线程编程并非易事,其中充满了许多潜在的问题和陷阱。作为一名Java开发工程师,掌握并发编程的常见问题及其解决方案…...
DEEPPOLAR:通过深度学习发明非线性大核极坐标码(1)
原文:《DEEPPOLAR: Inventing Nonlinear Large-Kernel Polar Codes via Deep Learning》 摘要 信道编码设计的进步是由人类的创造力推动的,而且恰如其分地说,这种进步是零星的。极性码是在Arikan极化核的基础上开发的,代表了编码…...
Java多态详解
Java多态详解 什么是多态? 比如我们说:“驾驶一辆车”,有人开的是自行车,有人开的是摩托车,有人开的是汽车。虽然我们都说“开车”,但“怎么开”是由具体的车类型决定的:“开”是统一的动作&a…...
go程序编译成动态库,使用c进行调用
以下是使用 Go 语言打包成 .so 库并使用 C 语言调用的完整步骤: 1. Go 语言打包成 .so 库 (1)编写 Go 代码 创建一个 Go 文件(如 calculator.go),并定义需要导出的函数。导出的函数名必须以大写字母开头…...
iVX:图形化编程与组件化的强强联合
在数字化浪潮中,软件开发范式正经历着从文本到图形的革命性转变。iVX 作为国产可视化编程领域的领军者,以 “图形化逻辑 组件化架构” 的双重创新,重新定义了软件开发的效率边界。其技术突破不仅体现在开发方式的革新,更通过一系…...
华为配置篇-RSTP/MSTP实验
MSTP 一、简介二、常用命令总结三、实验 一、简介 RSTP(快速生成树协议) RSTP(Rapid Spanning Tree Protocol)是 STP 的改进版本,基于 IEEE 802.1w 标准,核心目标是解决传统 STP 收敛速度慢的问…...
端口号被占用怎么解决
windows环境下端口号被占用怎么解决 win r 快捷键打开cmd输入netstat -ano|findstr 端口号 通过这个命令找到pidtaskkill /pid pid端口号 /t /f 如下图所示 命令解读 netstat 是一个网络统计工具,它可以显示协议统计信息和当前的TCP/IP网络连接。 -a 参数告诉 nets…...
GO语言-导入自定义包
文章目录 1. 项目目录结构2. 创建自定义包3. 初始化模块4. 导入自定义包5. 相对路径导入 在Go语言中导入自定义包需要遵循一定的目录结构和导入规则。以下是详细指南(包含两种方式): 1. 项目目录结构 方法1:适用于Go 1.11 &#…...
ES常识5:主分词器、子字段分词器
文章目录 一、主分词器:最基础的文本处理单元主分词器的作用典型主分词器示例 二、其他类型的分词器:解决主分词器的局限性1. 子字段分词器(Multi-fields)2. 搜索分词器(Search Analyzer)3. 自定义分词器&a…...
NoSQL数据库技术与应用复习总结【看到最后】
第1章 初识NoSQL 1.1 大数据时代对数据存储的挑战 1.高并发读写需求 2.高效率存储与访问需求 3.高扩展性 1.2 认识NoSQL NoSQL--非关系型、分布式、不提供ACID的数据库设计模式 NoSQL特点 1.易扩展 2.高性能 3.灵活的数据模型 4.高可用 NoSQL拥有一个共同的特点&am…...
单片机-STM32部分:12、I2C
飞书文档https://x509p6c8to.feishu.cn/wiki/MsB7wLebki07eUkAZ1ec12W3nsh 一、简介 IIC协议,又称I2C协议,是由PHILP公司在80年代开发的两线式串行总线,用于连接微控制器及其外围设备,IIC属于半双工同步通信方式。 IIC是一种同步…...
【英语笔记(四)】诠释所有16种英语时态,介绍每种时态下的动词变形!!含有所有时态的的动词变形汇总表格
1 时态的单词构成 1.1 现在 1.1.1 一般现在时态 动词原形动词原形s(第三人称单数) 1.1.1.1 表达事实 I eat carrots. 我吃胡萝卜:我是吃胡萝卜这种食物的.(这个是事实陈述) The rabbit eats carrots. 兔子吃胡萝卜…...
【质量管理】什么是过程?
在文章【质量管理】谁是顾客?什么是质量链?-CSDN博客 中我们了解了什么是顾客,顾客不仅仅是企业以外的人,在企业的内部我们也有大大小小的顾客。并且我们了解了什么是质量链,企业内部的各种供给方和客户形成了质量链。…...
效率办公新工具:PDF Reader Pro V5.0功能解析与使用体验
在日常文档处理与数字办公的场景中,PDF 文件依然是主流格式之一。从合同审批、项目文档、财务报表,到技术方案和用户手册,PDF 的编辑、转换、标注、归档需求始终存在。 面对这些需求,越来越多用户希望有一款功能完整、跨平台、智…...
Java对象内存布局和对象头
1、面试题 1)说下JUC,AQS的大致流程 CAS自旋锁,是获取不到锁就一直自旋吗? 2)CAS和synchronized区别在哪里,为什么CAS好,具体优势在哪里? 3)sychro…...
Vue 跨域解决方案及其原理剖析
在现代 Web 开发中,跨域问题是前端开发者经常面临的挑战之一。当使用 Vue.js 构建应用时,跨域请求的处理尤为重要。本文将深入探讨 Vue 解决跨域的多种方法及其背后的原理,帮助开发者更好地理解和应对这一常见问题。 一、跨域问题概述 1. 同…...
TikTok 互动运营干货:AI 助力提升粘性
在 TikTok 运营的众多环节中,与用户的互动是建立紧密联系、提升账号粘性的关键所在。及时且真诚地回复评论和私信,能让用户切实感受到你的关注与尊重,从而极大地增强他们对你的好感与粘性。对于用户提出的问题,要以耐心、专业的态…...
Kids A-Z安卓版:儿童英语启蒙的优质选择
Kids A-Z安卓版 是一款由北美知名分级读物厂商 Learning A-Z 官方推出的英语分级学习应用,也被称为 Raz-Kids app。它专为 K-5 年级的学生设计,提供丰富的英语学习资源和互动学习体验,帮助孩子们在轻松愉快的环境中提升英语能力。通过动画、互…...
接口继承与扩展的使用技巧
在 TypeScript 中,接口继承和扩展是非常强大且灵活的功能,可以帮助我们更高效地管理类型和提高代码的可重用性。接口继承使得一个接口可以从另一个接口继承属性和方法,而接口扩展允许我们通过组合多个接口来构建更复杂的结构。这些特性使得 T…...
【React】Craco 简介
Craco 简介 Craco (Create React App Configuration Override) 是一个用于自定义 Create React App (CRA) 配置的工具,无需 eject(弹出)项目。 为什么需要 Craco Create React App 虽然提供了零配置的 React 开发体验,但其配置…...
HTML5中的Microdata与历史记录管理详解
Microdata 简介 Microdata 是 HTML5 引入的一种标记方式,用于在网页中嵌入机器可读的语义信息。通过使用 Microdata,开发者可以在 HTML 元素中添加特定的属性,以便搜索引擎和其他工具更好地理解网页内容。 Microdata 的核心属性包括 itemsc…...
UNet网络 图像分割模型学习
UNet 由Ronneberger等人于2015年提出,专门针对医学图像分割任务,解决了早期卷积网络在小样本数据下的效率问题和细节丢失难题。 一 核心创新 1.1对称编码器-解码器结构 实现上下文信息与高分辨率细节的双向融合 如图所示:编码器进行了4步&…...
Babel 深度解析:现代 JavaScript 开发的桥梁
1. 什么是 Babel? Babel 是一个 JavaScript 编译器(又称转译器),核心使命是解决 JavaScript 的环境兼容性问题。它允许开发者使用最新的语言特性(如 ES6、JSX、TypeScript),同时将代码转换为旧…...
MyBatis源码解读2(2.1、核心对象)
二、MyBatis的核心对象 2.1、核心对象 2.1、MappedStatement MyBatis其实是对JDBC的进一步封装,我们都知道JDBC有几个重要的对象: StatementPrepared StatementCallable StatementResultSet Statement、Prepared Statement、Callable Statement分别…...
03.three官方示例+编辑器+AI快速学习webgl_animation_multiple
本实例主要讲解内容 这个示例展示了Three.js中骨骼动画的高级应用技巧,重点演示了如何使用SkeletonUtils.clone()方法复制模型,并展示了两种不同的骨骼动画管理方式: 独立骨骼模式:每个模型拥有独立的骨骼结构,可播放…...
无锁秒杀系统设计:基于Java的高效实现
引言 在电商促销活动中,秒杀场景是非常常见的。为了确保高并发下的数据一致性、性能以及用户体验,本文将介绍几种不依赖 Redis 实现的无锁秒杀方案,并提供简化后的 Java 代码示例和架构图。 一、基于数据库乐观锁机制 ✅ 实现思路…...
MyBatis快速入门——实操
默认:电脑搭建好了Maven环境 本次入门实验使用的idea版本:ideaU2022.1 目录 一:前期准备工作 1. 创建一个springboot工程 2. Maven环境配置 3. 在mysql数据库中创建一个user表 4. 编写实体类User 二: 引入MyBatis的相关依赖…...
假如你的项目是springboot+vue怎么解决跨域问题
1. 前端代理(开发环境推荐) 适用场景:Vue 开发环境调试时,避免直接请求后端接口的跨域问题。 实现步骤: 在 Vue 项目的 vue.config.js 中配置代理: module.exports {devServer: {proxy: {/api: { // 代理…...
OpenResty反向代理
通过在 OpenResty 的配置文件中定义不同的 location 块,将匹配特定 URL 路径的请求转发到不同的后端 FastAPI 应用(即使它们运行在不同的端口或甚至是不同的服务器/容器上)。 核心思路: 多个 FastAPI 应用实例: 你的每…...
《Effective Python》第1章 Pythonic 思维详解——深入理解 Python 条件表达式(Conditional Expressions)
《Effective Python》第1章 Pythonic 思维详解——深入理解 Python 条件表达式(Conditional Expressions) 在 Python 中,条件表达式(conditional expressions)提供了一种简洁的方式来在一行中实现 if/else 的逻辑。它…...
【Typenum】 3 类型位运算(bit.rs)
一、源码 代码定义了一个类型级别的位(bit)系统,主要用于编译时的类型运算。 //! 类型级比特位实现 //! //! 这些是基础的比特位类型,作为本库中其他数值类型的构建基础 //! //! 已实现的**类型运算符**: //! //! - …...
python:trimesh 用于 STL 文件解析和 3D 操作
python:trimesh 是一个用于处理三维模型的库,支持多种格式的导入导出,比如STL、OBJ等,还包含网格操作、几何计算等功能。 Python Trimesh 库使用指南 安装依赖库 pip install trimesh Downloading trimesh-4.6.8-py3-none-any.w…...
stm32week15
stm32学习 十一.中断 2.NVIC Nested vectored interrupt controller,嵌套向量中断控制器,属于内核(M3/4/7) 中断向量表:定义一块固定的内存,以4字节对齐,存放各个中断服务函数程序的首地址,中断向量表定…...
数据库分库分表实战指南:从原理到落地
1. 为什么要分库分表? 1.1 单库瓶颈表现 存储瓶颈:单表数据超过5000万行,查询性能急剧下降性能瓶颈:单库QPS超过5000后响应延迟显著增加可用性风险:单点故障导致全系统不可用 1.2 突破性优势 --------------------…...
雷达工程师面试题目
雷达工程师面试题目 一、基础知识类 简述雷达的工作原理 请从电磁波的发射、传播、反射以及回波接收处理等环节,详细阐述雷达如何实现对目标的探测、定位与跟踪。 常见雷达体制及其特点 列举至少三种常见的雷达体制(如脉冲雷达、连续波雷达、相控阵雷达等),并分别说明…...
JVM-类加载子系统
最近在学习JVM,分模块整理一下JVM的笔记 目录 类加载子系统 一、加载 二、链接 1.验证 2.准备 3.解析 三、初始化 类加载子系统 类加载子系统负责将字节码文件加载到虚拟机中,我们正常编写完一个Java类并在前端编译器编译后会生成一个对应的字节码…...
从0开始学习大模型--Day06--大模型的相关网络架构
云服务器 在平时,我们总能听到诸如用服务器跑数据、模型,或者是搭建服务器之类的话,实际上,它相当于一台算力、内存、运行内存等各个方面都很强大的电脑,只需要我们用自己的电脑通过互联网链接他就能使用它࿰…...
控制LED灯设备
本章分别使用C库和系统调用的文件操作方式控制开发板的LED灯,展示如何在应用层通过系统提供的设备文件控制相关硬件。 本章的示例代码目录为:base_code/linux_app/led/sys_class_leds。 9.1. LED子系统 在Linux系统中,绝大多数硬件设备都有…...
Three.js + React 实战系列 - 联系方式提交表单区域 Contact 组件✨(表单绑定 + 表单验证)
对个人主页设计和实现感兴趣的朋友可以订阅我的专栏哦!!谢谢大家!!! 在现代网页中,一个精致的 Contact 区域不仅仅是表单的堆砌,更是用户与我们建立联系的第一印象。 在本节课中,我…...