当前位置: 首页 > news >正文

【Flink-scala】DataStream编程模型之 窗口的划分-时间概念-窗口计算程序

DataStream编程模型之 窗口的划分-时间概念-窗口计算程序

1. 窗口的划分

1.1 窗口分为:基于时间的窗口 和 基于数量的窗口

基于时间的窗口:基于起始时间戳终止时间戳来决定窗口的大小
基于数量的窗口:根据固定的数量定义窗口 的大小

这里我看到的都是只有基于时间的窗口做的划分,没有数量的,发现运用数量窗口划分也比较少,因此很多地方都省略了。

Count Window 也有滚动窗口、滑动窗口等,可以借鉴Flink实战之CountWindowr的滚动窗口、滑动窗口WindowsAPI使用示例
接下来就开始介绍基于时间的窗口

1.2 在Flink中窗口的设定和数据本身无关,而是系统事先定义好的

窗口是Flink划分数据一个基本单位,窗口的划分方式是默认会根据自然时间划分,并且划分方式是前闭后开。
如图:窗口的划分,左闭右开
左闭右开

2.时间概念

流数据中,数据具有时间属性。Flink根据时间的产生时间把时间划分为3中类型:1.事件生成时间(Event time)2.事件接入时间 (Ingestion Time)3.事件处理时间 (Processing Time)
可以借鉴下图理解:
在这里插入图片描述

1.基站产生数据,分区传入Flink数据源
2.传入数据的时间 IngstionTime
3.划分窗口进行处理时间 ProcessingTime

2.1 事件生成时间

是每个独立事件在产生它的设备上发生的时间,这个时间通常在事件进入Flink前就已经进入到事件当中了,也就是说,事件时间是从原始的消息中提取到的。比如 Kafka消息,每个生成的消息中自带一个时间戳代表每条数据的产生时间.

2.2事件接入时间

是数据进入Flink系统的时间,它主要依赖于其数据源算子所在主机的系统时钟。理论上,接入时间处于事件时间和处理时间之间。接入时间不能处理乱序问题或者延迟数据。如果需要处理此类问题,建议使用事件时间

2.3 事件处理时间

是指数据在操作算子计算过程中获取到的所在主机时间,这个时间是由Flink系统自己提供的。这种处理时间方式实时性是最好的,但计算结果未必准确,主要用于时间计算精度要求不是特别高的计算场景,比如延时比较高的日志数据

2.4事件时间和处理时间区别

这里的时间方便了解,比如事件时间,一个在米国产生的时间,一个在中国产生的时间,这两个有时差 不一样,但是数据世界是一样的,应该是从1970计算的时间戳。
在这里插入图片描述
在Flink初始化流式运行环境时,会设置流处理的时间特性

//设置执行环境 
val env = StreamExecutionEnvironment.getExecutionEnvironment 
//把时间特性设置为“事件时间” 
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //或者,把时间特性设置为“处理时间” 
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)

3.窗口计算

3.1 窗口计算的程序结构

3.1.1.分组

分为:分组窗口和非分组窗口
看一组图:
左侧为分组窗口,右侧不分组。
分组窗口

3.2窗口的计算过程

如图
在这里插入图片描述

在窗口计算时,需要将数据流先分组 keyby,再分窗口。那么在写程序的时候也是先进行这两步,当窗口程序计算完成后(ruduce,aggregate,process等),又变为DataStream。

分组数据流程序结构如下:

dataStream.keyBy(...)         //是分组数据流.window(...)          //指定窗口分配器类型[.trigger(...)]           //指定触发器类型(可选)[.evictor(...)]           //指定驱逐器或者不指定(可选)[.allowedLateness()]    //指定是否延迟处理数据(可选).reduce/fold/apply()   //指定窗口计算函数

非分组数据流程序结构如下:

dataStream.windowAll(...)      //指定窗口分配器类型[.trigger(...)]           //指定触发器类型(可选)[.evictor(...)]           //指定驱逐器或者不指定(可选)[.allowedLateness()]     //指定是否延迟处理数据(可选).reduce/fold/apply()    //指定窗口计算函数

不分组理解为所有数据为一个窗口。 windowAll(…)

3.2 窗口分配器

窗口分配器是负责将每一个到来的元素分配给一个或者多个窗口。
Flink提供预定义窗口分配器
在这里插入图片描述
窗口分配器在程序就一行可以了。

这几个窗口可以理解为火车站/汽车站/飞机场的屏幕,假设有25条消息,一页显示10条消息。
滚动:一页显示10条消息,滚动一下,下一页 11-20,滚动 21-25.
滑动:这里涉及一次滑动步长(假设为1),1-10 滑动 ,2-11滑动,3-12 …

3.2.1滚动窗口

滚动窗口是根据固定时间或大小对数据流进行切分,且窗口和窗口之间的元素不会重叠
在这里插入图片描述
DataStream API提供了两种滚动窗口类型,
即基于事件时间的滚动窗口(TumblingEventTimeWindows)和

基于处理时间的滚动窗口(TumblingProcessingTimeWindows),

二者对应的窗口分配器分别为TumblingEventTimeWindows
和TumblingProcessingTimeWindows。

窗口的长度

窗口的长度可以用org.apache.flink.streaming.api.windowing.time.Time中的
seconds、minutes、hours和days来设置。

3.2.1.1 滚动窗口的实例

1.事件 时间 滚动 ,窗口大小5秒
关键词:TumblingEventTimeWindows

val dataStream: DataStream[T] = ...//基于事件时间的滚动窗口,窗口大小为5秒钟
dataStream.keyBy(...).window(TumblingEventTimeWindows.of(Time.seconds(5))).<window function>(...)

2.基于处理时间的滚动
关键词:TumblingProcessingTimeWindows

//基于处理时间的滚动窗口,窗口大小为5秒钟
dataStream.keyBy(...).window(TumblingProcessingTimeWindows.of(Time.seconds(5))).<window function>(...)

3.事件时间的滚动窗口,窗口大小为1小时,偏移量为15分钟
偏移量调整窗口开始时间的数字。比如就会从整点的15分,30分,45分,00分开始,允许数据进行移位,用于时效性不强的数据。

//基于事件时间的滚动窗口,窗口大小为1小时,偏移量为15分钟
dataStream.keyBy(...).window(TumblingEventTimeWindows.of(Time.hours(1), Time.minutes(15))).<window function>(...)

还可以使用快捷方法 timeWindow() 来定义TumblingEventTimeWindowsTumblingProcessingTimeWindows,举例如下:

dataStream.keyBy(...).timeWindow(Time.seconds(1)).<window function>(...)

如果使用的是timewindow,那么就没说明是-事件时间-还是-处理时间。窗口类型就要根据程序中设置的TimeCharacteristic的值来决定。

当我们在程序中设置了env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)时,Flink会创建TumblingEventTimeWindows

当设置了env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)时,Flink会创建TumblingProcessingTimeWindows
认识英文单词就能很好的理解啦。

3.2.2滑动窗口

滑动窗口有重叠。(就是大屏幕的一个个向下滑动那种)
在这里插入图片描述

3.2.2.1 滑动窗口的实例

继续学习英文单词 slide :滑动(v) 它还有ppt幻灯片的意思。
window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))) 里面得有两个参数,一个是窗口大小,一个是滑动步长。

不一样的就是处理时间和事件时间。

val dataStream: DataStream[T] = ...//基于事件时间的滑动窗口,窗口大小为10秒,滑动步长为5秒
dataStream.keyBy(...).window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))).<window function>(...)

//基于处理时间的滑动窗口,窗口大小为10秒,滑动步长为5秒
dataStream.keyBy(<...>).window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))).<window function>(...)

这个是参数多了偏移量。

//基于处理时间的滑动窗口,窗口大小为12小时,滑动步长为1小时,偏移量为8小时
dataStream.keyBy(<...>).window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8))).<window function>(...)
3.2.2.2 滑动步长与窗口大小的关系

在这里插入图片描述

3.2.3 会话窗口

会话窗口根据会话间隙(Session Gap)切分不同的窗口,当一个窗口在大于会话间隙的时间内没有接收到新数据时,窗口将关闭。在这种模式下,窗口的长度是可变的,每个窗口的开始和结束时间并不是确定的。
在这里插入图片描述
接下来再来两个实例
看代码的withgap,中间的gap时间。

val input: DataStream[T] = ...//基于事件时间的会话窗口,会话间隙为10分钟
input.keyBy(...).window(EventTimeSessionWindows.withGap(Time.minutes(10))).<window function>(...)//基于处理时间的会话窗口,会话间隙为10分钟
input.keyBy(...).window(ProcessingTimeSessionWindows.withGap(Time.minutes(10))).<window function>(...)

3.3窗口的计算函数

在Flink的窗口计算程序中,在确定了窗口分配器以后,接下来就要确定窗口计算函数,从而完成对窗口内数据集的计算。

Flink提供了四种类型的窗口计算函数,分别是

1. ReduceFunction、
2. AggregateFunction、
3. FoldFunction
4. ProcessWindowFunction。

根据计算原理,ReduceFunction、AggregateFunction和FlodFunction属于增量聚合函数,而ProcessWindowFunction则属于全量聚合函数(这里处理的是window 那么就是整个窗口了,就是全量了)。

3.3.1 ReduceFunction

ReduceFunction定义了对输入的两个相同类型的数据元素按照指定的计算方法进行聚合计算,然后输出类型相同的一个结果元素。

从这句话可以理解,先keyBy,再将同组的聚合计算。

接下来看代码

import java.util.Calendar
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.source.RichSourceFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time
import scala.util.Randomcase class StockPrice(stockId:String,timeStamp:Long,price:Double)
object ReduceWindowFunctionTest {def main(args: Array[String]) {//设置执行环境val env = StreamExecutionEnvironment.getExecutionEnvironment//设置程序并行度
env.setParallelism(1)//设置为处理时间
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)//创建数据源,股票价格数据流val stockPriceStream: DataStream[StockPrice] = env//该数据流由StockPriceSource类随机生成.addSource(new StockPriceSource)
//确定针对数据集的转换操作逻辑val sumStream = stockPriceStream.keyBy(s => s.stockId).timeWindow(Time.seconds(1)).reduce((s1, s2) => StockPrice(s1.stockId,s1.timeStamp, s1.price + s2.price))//打印输出
sumStream.print()//程序触发执行env.execute("ReduceWindowFunctionTest")}class StockPriceSource extends RichSourceFunction[StockPrice]{var isRunning: Boolean = trueval rand = new Random()//初始化股票价格var priceList: List[Double] = List(10.0d, 20.0d, 30.0d, 40.0d, 50.0d)var stockId = 0var curPrice = 0.0d
override def run(srcCtx: SourceContext[StockPrice]): Unit = {while (isRunning) {//每次从列表中随机选择一只股票stockId = rand.nextInt(priceList.size)val curPrice =  priceList(stockId) + rand.nextGaussian() * 0.05priceList = priceList.updated(stockId, curPrice)val curTime = Calendar.getInstance.getTimeInMillis//将数据源收集写入SourceContextsrcCtx.collect(StockPrice("stock_" + stockId.toString, curTime, curPrice))Thread.sleep(rand.nextInt(1000))}}override def cancel(): Unit = {isRunning = false}}
}

分析代码:
具体看这几行

   val sumStream = stockPriceStream.keyBy(s => s.stockId).timeWindow(Time.seconds(1)).reduce((s1, s2) => StockPrice(s1.stockId,s1.timeStamp, s1.price + s2.price))

现根据ID分组变成Keyedstream,再分窗口,然后reduce聚合同ID的price。

使用Maven工具对程序进行编译打包,然后提交到Flink中运行,在运行日志中可以看到类似如下的输出结果:

StockPrice(stock_1,1602036130952,39.78897954489408)
StockPrice(stock_4,1602036131741,49.950455275162945)
StockPrice(stock_2,1602036132184,30.073529000410154)
StockPrice(stock_3,1602036133154,79.88817093404676)
StockPrice(stock_0,1602036133919,9.957551599687758)
StockPrice(stock_1,1602036134385,39.68343765292602)
……

3.3.2 AggregateFunction

这个单词的意思就是聚合。

Flink的AggregateFunction是一个基于中间计算结果状态进行增量计算的函数。由于是迭代计算方式,所以,在窗口处理过程中,不用缓存整个窗口的数据,所以效率执行比较高。

AggregateFunction比ReduceFunction更加通用,它定义了3个需要复写的方法:

  1. add
  2. getResult
  3. merge

其中,add()定义了数据的添加逻辑,getResult()定义了累加器计算的结果,merge()定义了累加器合并的逻辑。

1.add 函数
功能:该函数用于将输入元素添加到累加器中。在聚合过程中,每当有新的数据元素流入时,Flink 会调用此函数来更新累加器的状态。

参数:add 函数通常接收两个参数,一个是当前的累加器状态,另一个是待聚合的输入元素。

返回值:该函数返回更新后的累加器状态。

2.getResult 函数

功能:该函数用于从累加器中提取聚合结果。在窗口触发或查询结束时,Flink 会调用此函数来获取最终的聚合结果。

参数:getResult 函数通常只接收一个参数,即当前的累加器状态。

返回值:该函数返回聚合后的结果,其类型通常由 AggregateFunction 的输出类型参数指定。

3.merge 函数:

功能:该函数用于在并行执行时合并两个累加器的状态。在分布式计算环境中,同一个窗口的数据可能会分配到不同的节点上进行处理。当这些节点上的聚合操作完成后,需要将它们的累加器状态合并起来以得到全局的聚合结果。

参数:merge 函数通常接收两个参数,即两个待合并的累加器状态。

返回值:该函数返回合并后的累加器状态。

这三个函数共同构成了 Flink 中 AggregateFunction 的核心逻辑。通过实现这三个函数,用户可以定义自定义的聚合操作,以满足各种复杂的数据处理需求。

注意 除了这三个函数外,AggregateFunction 接口通常还包含一个 createAccumulator 方法,用于初始化一个新的累加器实例。该方法在聚合操作开始时被调用,并返回一个空的或初始化的累加器状态。

举例代码:

import java.util.Calendar
import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.source.RichSourceFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time
import scala.util.Randomcase class StockPrice(stockId:String,timeStamp:Long,price:Double) object AggregateWindowFunctionTest {def main(args: Array[String]) { // 设置执行环境val env = StreamExecutionEnvironment.getExecutionEnvironment//设置程序并行度env.setParallelism(1)//设置为处理时间env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) //创建数据源,股票价格数据流val stockPriceStream: DataStream[StockPrice] = env .addSource(new StockPriceSource)    //该数据流由StockPriceSource类随机生成stockPriceStream.print("input“)    //设定针对数据集的转换操作逻辑val sumStream = stockPriceStream.keyBy(s => s.stockId).timeWindow(Time.seconds(1)).aggregate(new MyAggregateFunction)   //自定义的聚合函数,那么就需要实现三个方法//打印输出sumStream.print("output“) //程序触发执行env.execute("AggregateWindowFunctionTest")}class StockPriceSource extends RichSourceFunction[StockPrice]{var isRunning: Boolean = trueval rand = new Random()// 初始化股票价格var priceList: List[Double] = List(10.0d, 20.0d, 30.0d, 40.0d, 50.0d)var stockId = 0var curPrice = 0.0d override def run(srcCtx: SourceContext[StockPrice]): Unit = {while (isRunning) {// 每次从列表中随机选择一只股票stockId = rand.nextInt(priceList.size)val curPrice =  priceList(stockId) + rand.nextGaussian() * 0.05priceList = priceList.updated(stockId, curPrice)val curTime = Calendar.getInstance.getTimeInMillis// 将数据源收集写入SourceContextsrcCtx.collect(StockPrice("stock_" + stockId.toString, curTime, curPrice))Thread.sleep(rand.nextInt(500))}}override def cancel(): Unit = {isRunning = false}}//自定义函数class MyAggregateFunction extends AggregateFunction[StockPrice,(String,Double,Long),(String,Double)] {//回忆一下:case class StockPrice(stockId:String,timeStamp:Long,price:Double) 
//返回值不要时间了,只要id 和price//创建累加器
override def createAccumulator(): (String,Double, Long) = ("",0D,0L)//定义把输入数据累加到累加器的逻辑override def add(input:StockPrice,acc:(String,Double,Long))={(input.stockId,acc._2+input.price,acc._3+1L)  //平均价格 所以数量  +1L}//根据累加器得出结果
override def getResult(acc:(String,Double,Long)) = (acc._1,acc._2 / acc._3)//定义累加器合并的逻辑override def merge(acc1:(String,Double,Long),acc2:(String,Double,Long)) = {(acc1._1,acc1._2+acc2._2,acc1._3+acc2._3)}}
}

这里我开始理解为了股票的三个属性(String,Double,Long),这里理解错了。

在 Apache Flink 的 AggregateFunction 接口中,当你定义一个自定义的聚合函数时,你需要指定三个类型参数:

1.输入类型(InputType):这是流中元素的类型,例子中为 StockPrice。
2.累加器类型(AccumulatorType):这是用于在聚合过程中存储中间状态的类型。例子中,这是一个三元组 (String, Double, Long),其中 String 表示股票ID(尽管这里的处理可能不是最理想的,因为通常累加器不应该包含像股票ID这样的非聚合字段),Double 表示价格的总和,Long 表示价格的数量(或说是处理了多少个价格数据点)。
3.输出类型(OutputType):这是聚合函数最终产生的结果类型。例子中,这也是一个二元组 (String, Double),其中 String 同样是股票ID(这里同样需要注意可能的逻辑问题),Double 是计算出的平均价格。

我才开始没有理解acc1._3 + acc2._3,现写如下:

acc1._1,acc1._2 + acc2._2, // 合并价格总和acc1._3 + acc2._3  // 合并价格数量

根据代码的先后顺序及运行顺序,最后执行getresult。
先聚合,最后平均。

在大多数情况下,add 方法会首先被调用,用于处理流入的数据并更新累加器状态。然后,根据并行度和数据分布,merge 方法可能会被调用以合并累加器状态。最后,在窗口触发或查询结束时,getResult 方法会被调用以提取最终的聚合结果。

代码分析完后,输出结果:

input> StockPrice(stock_2,1602040572049,29.99367518574229)
input> StockPrice(stock_2,1602040572205,30.03665296896211)
input> StockPrice(stock_2,1602040572601,30.00867347810531)
input> StockPrice(stock_0,1602040572856,9.974154737531954)
input> StockPrice(stock_1,1602040572934,19.997437804748245)
output> (stock_2,30.013000544269904)
output> (stock_1,19.997437804748245)
output> (stock_0,9.974154737531954)

3.3.3 FoldFunction

FoldFunction决定了窗口中的元素如何和一个输出类型的元素进行结合。对于每个进入窗口的元素而言,FoldFunction会被增量调用。窗口中的第一个元素将会和这个输出类型的初始值进行结合。需要注意的是,FoldFunction不能用于会话窗口和那些可合并的窗口。

//前面的代码和ReduceWindowFunctionTest程序中的代码相同,因此省略
val sumStream = stockPriceStream.keyBy(s => s.stockId).timeWindow(Time.seconds(1)).fold("CHINA_"){ (acc, v) => acc + v.stockId }

3.3.4 ProcessWindowFunction

前面提到的ReduceFunction和AggregateFunction都是基于中间状态实现增量计算的窗口函数,虽然已经满足绝大多数场景的需求

但是,在某些情况下,统计更复杂的指标可能需要依赖于窗口中所有的数据元素,或需要操作窗口中的状态数据和窗口元数据,这时就需要使用到 ProcessWindowFunction,因为它能够更加灵活地支持基于窗口全部数据元素的结果计算。

import java.time.Duration
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collectorcase class StockPrice(stockId:String,timeStamp:Long,price:Double)
object ProcessWindowFunctionTest {def main(args: Array[String]) {//设置执行环境val env = StreamExecutionEnvironment.getExecutionEnvironment//设置程序并行度env.setParallelism(1)//设置为处理时间env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)//创建数据源,股票价格数据流val source = env.socketTextStream("localhost", 9999)//指定针对数据流的转换操作逻辑val stockPriceStream = source.map(s => s.split(",")).map(s=>StockPrice(s(0).toString,s(1).toLong,s(2).toDouble))val sumStream = stockPriceStream.assignTimestampsAndWatermarks(WatermarkStrategy//为了测试方便,这里把水位线设置为0.forBoundedOutOfOrderness[StockPrice](Duration.ofSeconds(0)).withTimestampAssigner(new SerializableTimestampAssigner[StockPrice] {override def extractTimestamp(element: StockPrice, recordTimestamp: Long): Long = element.timeStamp})).keyBy(s => s.stockId).timeWindow(Time.seconds(3)).process(new MyProcessWindowFunction())//打印输出sumStream.print()//执行程序env.execute("ProcessWindowFunction Test")}class MyProcessWindowFunction extends ProcessWindowFunction[StockPrice, (String, Double), String, TimeWindow] {//一个窗口结束的时候调用一次(一个分组执行一次),不适合大量数据,全量数据保存在内存中,会造成内存溢出override def process(key: String, context: Context, elements: Iterable[StockPrice], out: Collector[(String, Double)]): Unit = {//聚合,注意:整个窗口的数据保存到Iterable,里面有很多行数据var sumPrice = 0.0;elements.foreach(stock => {sumPrice = sumPrice + stock.price})out.collect(key, sumPrice/elements.size)}}
}

这个代码里需要注意的是 .assignTimestampsAndWatermarks 和MyProcessWindowFunction

.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness[StockPrice](Duration.ofSeconds(0)).withTimestampAssigner(new SerializableTimestampAssigner[StockPrice] {override def extractTimestamp(element: StockPrice, recordTimestamp: Long): Long = element.timeStamp})
)

WatermarkStrategy.forBoundedOutOfOrderness 用于创建一个处理有界乱序数据的水位线策略。参数 Duration.ofSeconds(0) 表示没有乱序,即所有数据都是按时序到达的(在实际应用中,这通常是一个简化的假设,实际数据往往会有一定程度的乱序)。withTimestampAssigner 方法用于指定如何从数据元素中提取时间戳,这里是从 StockPrice 对象的 timeStamp 字段中提取。

process 方法用于应用一个自定义的 ProcessWindowFunction。在这个例子中,MyProcessWindowFunction 是一个自定义的窗口函数,它接收一个键(股票ID)、一个上下文对象(包含窗口的元数据,如开始和结束时间)、一个包含窗口内所有元素的迭代器,以及一个用于收集输出结果的收集器。

在 MyProcessWindowFunction 的 process 方法中,代码遍历了窗口内的所有 StockPrice 元素,计算了价格的总和,并计算了平均值(总和除以元素数量)。然后,它将结果(股票ID和平均价格)收集到输出流中。

下一小节该总结触发器啦。

相关文章:

【Flink-scala】DataStream编程模型之 窗口的划分-时间概念-窗口计算程序

DataStream编程模型之 窗口的划分-时间概念-窗口计算程序 1. 窗口的划分 1.1 窗口分为&#xff1a;基于时间的窗口 和 基于数量的窗口 基于时间的窗口&#xff1a;基于起始时间戳 和终止时间戳来决定窗口的大小 基于数量的窗口&#xff1a;根据固定的数量定义窗口 的大小 这…...

DVWA靶场通过——文件上传漏洞

File Upload漏洞 它允许攻击者通过上传恶意文件来执行任意代码、窃取数据、获取服务器权限&#xff0c;甚至完全控制服务器。为了防止文件上传漏洞&#xff0c;开发者需要对文件上传过程进行严格的验证和处理。 1. 文件上传漏洞概述 文件上传漏洞发生在Web应用程序允许用户通过…...

原子类、AtomicLong、AtomicReference、AtomicIntegerFieldUpdater、LongAdder

原子类 JDK提供的原子类&#xff0c;即Atomic*类有很多&#xff0c;大体可做如下分类&#xff1a; 形式类别举例Atomic*基本类型原子类AtomicInteger、AtomicLong、AtomicBooleanAtomic*Array数组类型原子类AtomicIntegerArray、AtomicLongArray、AtomicReferenceArrayAtomic…...

MySQL(8)【聚合函数 | group by分组查询】

阅读导航 引言一、聚合函数1. 简介2. 使用示例&#xff08;1&#xff09;COUNT() 函数&#xff08;2&#xff09;SUM() 函数&#xff08;3&#xff09;AVG() 函数&#xff08;4&#xff09;MAX() 函数&#xff08;5&#xff09;MIN() 函数 二、group by分组查询1. 基本语法2. 按…...

如何监控Elasticsearch集群状态?

大家好&#xff0c;我是锋哥。今天分享关于【如何监控Elasticsearch集群状态&#xff1f;】面试题。希望对大家有帮助&#xff1b; 如何监控Elasticsearch集群状态&#xff1f; 1000道 互联网大厂Java工程师 精选面试题-Java资源分享网 监控 Elasticsearch 集群的状态对于确保…...

React第七节 组件三大属性之 refs 的用法注意事项

1、定义 React 中refs 是允许我们操作DOM 访问组件实例的一种方案。开发人员可以直接使用 refs 访问操作DOM&#xff0c;而不用自身的数据状态&#xff0c;这种方案在实际开发过程中是有必要的&#xff0c;但是不建议通篇使用refs操作DOM&#xff0c;如果是这样&#xff0c;那…...

全文单词统计

目标&#xff1a;统计词频 import scala.io.Source //知识点 //1.字符串.split("分隔符")&#xff1a;把字符串用指定的分隔符。拆分成多份&#xff0c;保存在数组中 object test1 {def main(args: Array[String]): Unit { //从文件1.txt中读入内容val contentSourc…...

Angular v19 (二):响应式当红实现signal的详细介绍:它擅长做什么、不能做什么?以及与vue、svelte、react等框架的响应式实现对比

本文紧接着Angular v19 新版本来啦&#xff0c;一起瞧瞧新特性吧&#xff01;&#xff0c;主要针对它在v18引入了一项全新的响应式技术——Signal&#xff0c;这引起了开发者社区的广泛关注&#xff0c;最新的v19版本推出了更多的signal工具。Signal的加入旨在优化Angular的响应…...

【数据结构】二叉搜索树(二叉排序树)

&#x1f31f;&#x1f31f;作者主页&#xff1a;ephemerals__ &#x1f31f;&#x1f31f;所属专栏&#xff1a;数据结构 目录 前言 一、什么是二叉搜索树 二、二叉搜索树的实现 节点 属性和接口的声明 插入 查找 删除 拷贝构造 析构 中序遍历 三、二叉搜索树的…...

文件的摘要算法(md5、sm3、sha256、crc)

为了校验文件在传输中保证完整性和准确性&#xff0c;因此需要发送方先对源文件产生一个校验码&#xff0c;并将该值传输给接收方&#xff0c;将附件通过ftph或http方式传输后&#xff0c;由接收方使用相同的算法对接收文件再获取一个新的校验码&#xff0c;将该值和发送方传的…...

Python实现人生重开模拟器

目录 人生重开模拟器介绍 代码实现 打印初始界面 设置初始属性 设置角色性别 设置角色出生点 针对每一岁&#xff0c;生成人生经历 完整代码 人生重开模拟器介绍 人生重开模拟器 是之前比较火的一个小游戏&#xff0c;我们这里使用 Python 实现一个简化版的 人生重开模…...

机器学习(二十五):决策树算法以及决策树和神经网络的对比

一、决策树集合 单一决策树会对训练数据的变化很敏感。例子&#xff1a;输入十个数据&#xff0c;判断是否是猫。只替换其中一个数据&#xff0c;信息增益最高的分裂特征就发生了改变&#xff0c;决策树就发生了变化。 使用决策树集合可以使算法更加健壮。例子&#xff1a;使用…...

k8s运行运行pod报错超出文件描述符表限制

1.问题描述 运行pod超过文件描述符表 unable to allocate file descriptor table - out of memory/opt/COMMAND.sh: line 9: 2.查看设备的文件描述符限制 操作前一定要先查询这个值&#xff0c;2097152这个值即为我们可设置的最大值&#xff0c;超过这个值后将无法登录&am…...

非常简单实用的前后端分离项目-仓库管理系统(Springboot+Vue)part 2

七、创建前端项目 你下载了nodejs吗&#xff1f;从cn官网下载&#xff1a;http://nodejs.cn/download/&#xff0c;或者从一个国外org网站下载&#xff0c;选择自己想要的版本https://nodejs.org/download/release/&#xff0c;双击下载好的安装文件&#xff0c;选择安装路径安…...

开源 AI 智能名片 2 + 1 链动模式 S2B2C 商城小程序源码助力品牌共建:价值、策略与实践

摘要&#xff1a;在当今数字化商业环境下&#xff0c;品牌构建已演变为企业与消费者深度共建的过程。本文聚焦于“开源 AI 智能名片 2 1 链动模式 S2B2C 商城小程序源码”&#xff0c;探讨其如何融入品牌建设&#xff0c;通过剖析品牌价值构成&#xff0c;阐述该技术工具在助力…...

微信小程序中的WXSS与CSS的关系及使用技巧

微信小程序中的WXSS与CSS的关系及使用技巧 引言 在微信小程序的开发中,样式的设计与实现是构建用户友好界面的关键。微信小程序使用WXSS(WeiXin Style Sheets)作为其样式表语言,WXSS在语法上与CSS非常相似,但也有一些独特的特性。本文将深入探讨WXSS与CSS的关系,介绍WX…...

STM32的CAN波特率计算

公式&#xff1a; CAN波特率 APB总线频率 / &#xff08;BRP分频器 1&#xff09;/ (SWJ BS1 BS2) SWJ一般为1。 例如STM32F407的&#xff0c;CAN1和CAN2都在在APB1下&#xff0c;频率是42000000 如果想配置成1M波特率&#xff0c;则计算公式为&#xff1a;...

【LeetCode面试150】——57插入区间

博客昵称&#xff1a;沈小农学编程 作者简介&#xff1a;一名在读硕士&#xff0c;定期更新相关算法面试题&#xff0c;欢迎关注小弟&#xff01; PS&#xff1a;哈喽&#xff01;各位CSDN的uu们&#xff0c;我是你的小弟沈小农&#xff0c;希望我的文章能帮助到你。欢迎大家在…...

活着就好20241128

早晨问候&#xff1a; 亲爱的朋友们&#xff0c;大家早上好&#xff01;今天是2024年11月28日&#xff0c;第48周的第四天&#xff0c;也是十一月的第二十八天&#xff0c;农历甲辰[龙]年十月廿四。在这个即将步入月末、阳光依旧明媚的清晨&#xff0c;愿第一缕阳光轻轻洒落在…...

【kafka03】消息队列与微服务之Kafka 读写数据

Kafka 读写数据 参考文档 Apache Kafka 常见命令 kafka-topics.sh #消息的管理命令 kafka-console-producer.sh #生产者的模拟命令 kafka-console-consumer.sh #消费者的模拟命令 创建 Topic 创建topic名为 chen&#xff0c;partitions(分区)为3&#xff0…...

【Agorversev1.1数据转换】Agorverse高清地图转OpenStreetMap及SUMO路网

文章目录 Agorverse高清地图转OpenStreetMap及SUMO路网1. Agorverse osm转换说明2. 转换源码3. 处理效果4. SUMO-Carla联合仿真 Agorverse高清地图转OpenStreetMap及SUMO路网 1. Agorverse osm转换说明 根据作者的描述&#xff0c;其高清地图的osm文件与标准osm的区别在于以下…...

Vue 3 实现高性能拖拽指令的最佳实践

前言 在现代前端开发中&#xff0c;拖拽功能是增强用户体验的重要手段之一。本文将详细介绍如何在 Vue 3 中封装一个拖拽指令&#xff08;v-draggable&#xff09;&#xff0c;并通过实战例子演示其实现过程。通过这篇教程&#xff0c;您将不仅掌握基础的拖拽功能&#xff0c;…...

AIGC--------AIGC在医疗健康领域的潜力

AIGC在医疗健康领域的潜力 引言 AIGC&#xff08;Artificial Intelligence Generated Content&#xff0c;人工智能生成内容&#xff09;是一种通过深度学习和自然语言处理&#xff08;NLP&#xff09;等技术生成内容的方式。近年来&#xff0c;AIGC在医疗健康领域展现出了极…...

Apache Calcite - calcite jdbc驱动使用场景

前言 在使用Calcite查询数据时通常会用到这些代码获取schema Connection connection DriverManager.getConnection("jdbc:calcite:", info); CalciteConnection calciteConnection connection.unwrap(CalciteConnection.class); SchemaPlus rootSchema calciteC…...

IEC61850实现方案和测试-4-MMS协议

IEC61850实现方案和测试-4作为介绍实现方案和测试的第四篇文章&#xff0c;后续会继续更新&#xff0c;欢迎关注。前三篇如下 第一篇是&#xff1a;IEC61850实现方案和测试-1-CSDN博客 第二篇是&#xff1a;IEC61850实现方案和测试-2-UCA-CSDN博客 第三篇是&#xff1a;IEC6…...

【ubuntu24.04】GTX4700 配置安装cuda

筛选显卡驱动显卡驱动 NVIDIA-Linux-x86_64-550.135.run 而后重启:最新的是12.6 用于ubuntu24.04 ,但是我的4700的显卡驱动要求12.4 cuda...

时间的礼物:如何珍视每一刻

《时间的礼物&#xff1a;如何珍视每一刻》 夫时间者&#xff0c;宇宙之精髓&#xff0c;生命之经纬&#xff0c;悄无声息而流转不息&#xff0c;如织锦之细线&#xff0c;串联古今&#xff0c;贯穿万物。 人生短暂&#xff0c;犹如白驹过隙&#xff0c;倏忽而逝&#xff0c;…...

componentReceivePropsreact class生命周期

componentReceiveProps并不是有props的变化触发&#xff0c;而是由父组件的更新触发的 父组件导致组件重新渲染&#xff0c;即使props没有更改&#xff0c;也会调用componentReceiveProps这个方法&#xff1b;如果只想处理更改&#xff0c;确保当前值与变更值比较--官方 …...

快速理解微服务中Sentinel怎么实现限流

Sentinel是通过动态管理限流规则&#xff0c;根据定义的规则对请求进行限流控制。 一.实现步骤 1.定义资源&#xff1a;在Sentinel中&#xff0c;资源可以是URL、方法等&#xff0c;用于标识需要进行限流的请求&#xff1b;(在Sentinel中&#xff0c;需要我们去告诉Sentinel哪些…...

25.100ASK_T113-PRO 测试摄像头(型号)

1.摄像头 USB2.0 摄像头,支持 UVC协议, 就是V4L2 USB2.0 大概可这样理解吧.这个是2K分辨率. 2.8mm焦距. 开发板还是 100ASK_T113-PRO V1.2版 2.查看摄像头驱动挂载情况 这样接好. 看看设备有没有挂载上 # ls /dev/video* /dev/video0 /dev/video1 这两个就是USB摄像头.说…...

20241127 给typecho文章编辑附件 添加视频 图片预览

Typecho在写文章时&#xff0c;如果一次性上传太多张图片可能分不清哪张&#xff0c;因为附件没有略缩图&#xff0c;无法实时阅览图片&#xff0c;给文章插入图片时很不方便。 编辑admin/file-upload.php 大约十八行的位置 一个while 循环里面,这是在进行html元素更新操作,在合…...

StarRocks-join优化

1、背景 有两个大表&#xff0c;都是6kw级别上下的&#xff0c;通过SR然后包装了一个接口对外提供查询&#xff0c;当前的问题是&#xff0c;这样大的join查询会导致BE直接宕机。并且这个sql很有代表性&#xff0c;我截图如下&#xff1a; 这个表是个单分区&#xff0c;所以直接…...

如何通过ChatGPT提高自己的编程水平

在编程学习的过程中&#xff0c;开发者往往会遇到各种各样的技术难题和学习瓶颈。传统的学习方法依赖书籍、教程、视频等&#xff0c;但随着技术的不断发展&#xff0c;AI助手的崛起为编程学习带来了全新的机遇。ChatGPT&#xff0c;作为一种强大的自然语言处理工具&#xff0c…...

实时数据开发 | checkpoints监控和调优

监控Checkpoints 监控 checkpoint 行为最简单的方法是通过 UI 的 checkpoint 部分。 监控这两个指标: 算子收到第一个 checkpoint barrier 的时间。当触发 checkpoint 的耗费时间一直很高时&#xff0c;这意味着 checkpoint barrier 需要很长时间才能从 source 到达 operator…...

面试手撕题积累

1、实现滑动窗口限流&#xff0c;允许每分钟最多有100个请求 阿里一面题。 核心&#xff1a; 时间窗口管理&#xff1a;滑动窗口会根据时间流逝不断更新&#xff0c;需要记录请求的时间戳&#xff0c;并根据当前时间计算窗口内的请求数量。 限流判断&#xff1a;每次请求到来…...

开发中使用UML的流程_05 PIM-1:分析系统流程

目录 1、概述 2、PIM生成的过程 3、用例叙述格式 4、用例关系 5、执行流程&#xff1a; 6、惯用的编号方式 1、概述 在进入到PIM阶段之后&#xff0c;系统分析员将所有系统用例依相关性分成若干组&#xff0c;以组别方式生成该组系统用例涉及的PIM-1---PIM-4产生结果&am…...

【Go】-go中的锁机制

目录 一、锁的基础知识 1. 互斥量/互斥锁 2. CAS&#xff08;compare and swap&#xff09; 3. 自旋锁 4. 读写锁 5. 乐观锁 & 悲观锁 6. 死锁 二、go中锁机制 1. Mutex-互斥锁 2. RWMutex-读写锁 2.1 RWMutex流程概览 2.2 写锁饥饿问题 2.3. golang的读写锁源…...

Scala学习记录,全文单词统计

package test32 import java.io.PrintWriter import scala.io.Source //知识点 // 字符串.split("分隔符"&#xff1a;把字符串用指定的分隔符&#xff0c;拆分成多个部分&#xff0c;保存在数组中) object test {def main(args: Array[String]): Unit {//从文件1.t…...

重构项目架构

前言 我们上篇文章对整个项目进行一个整体的规划&#xff0c;其中对于APP类规划了类&#xff0c;本篇文章我们就来实现这个规划&#xff1b; class App {//加载页面constructor() {}//获取位置_getPosition() {}//接受位置_loadMap() {}//在地图上点击展现表单_showForm() {}/…...

一个开源轻量级的服务器资源监控平台,支持告警推送

大家好&#xff0c;今天给大家分享一款开源的轻量级服务器资源监控工具Beszel&#xff0c;提供历史数据记录、Docker容器统计信息监控以及多种警报功能&#xff0c;用于监控服务器资源。 项目介绍 Beszel由hub&#xff08;中心服务器端应用&#xff0c;基于PocketBase构建&…...

介绍一下atof(arr);(c基础)

hi , I am 36 适合对象c语言初学者 atof(arr)&#xff1b;是返回浮点数(double型)&#xff0c;浮点数数是arr数组中字符中数字 格式 #include<stdio.h> atof(arr); 返回值arr数组中的数 未改变arr数组 #include<stdio.h> //atof(arr) 返 <stdlib> int…...

基于微信小程序的平价药房管理系统+LW参考示例

1.项目介绍 系统角色&#xff1a;管理员、医生、普通用户功能模块&#xff1a;用户管理、医生管理、药品分类管理、药品信息管理、在线问诊管理、生活常识管理、日常提醒管理、过期处理、订单管理等技术选型&#xff1a;SpringBoot&#xff0c;Vue&#xff0c;uniapp等测试环境…...

什么是 C++ 中的函数对象?它有什么特点?如何定义和使用函数对象?数对象与普通函数有什么区别?

在 C 中&#xff0c;函数对象&#xff08;Function Object&#xff09;也被称为仿函数&#xff08;Functor&#xff09;&#xff0c;是一种可以像函数一样被调用的对象&#xff0c;是一个类的对象&#xff0c;该类重载了函数调用运算符operator()。 函数对象的特点: 与普通函数…...

JAVA篇05 —— 内部类(Local、Anonymous、Member、Static)

欢迎来到我的主页&#xff1a;【一只认真写代码的程序猿】 本篇文章收录于专栏【小小爪哇】 如果这篇文章对你有帮助&#xff0c;希望点赞收藏加关注啦~ 目录 1 内部类Inner Class 1.1 局部内部类 1.2 匿名内部类&#xff08;※※&#xff09; 1.3 匿名类最佳实践&#xf…...

vmware安装ubuntu22.04 复制黏贴 上网

1、ubuntu下载 [Download - 清华镜像站]&#xff1a; 点击 清华大学开源软件镜像站 - Ubuntu 22.04.4 下载 页面中的 ubuntu-22.04.4-desktop-amd64.iso清华大学开源软件镜像站 - Ubuntu 22.04.4 下载 2、安装向导 3、网络设置 sudo netplan try sudo netplan apply4、复制…...

Spring Bean初始化流程

首先&#xff1a; 加载Bean定义(Configuration) 然后对于每个Bean&#xff1a; 1、实例化Bean&#xff08;应该是从Bean方法中获取&#xff0c;Bean方法里面包含new这个类型的代码&#xff09;2、依赖注入&#xff08;所依赖的Bean要经历相同的流程&#xff09;、调用Setter…...

C 语言函数递归探秘:从基础概念到复杂问题求解的进阶之路

我的个人主页 我的专栏&#xff1a;C语言&#xff0c;希望能帮助到大家&#xff01;&#xff01;&#xff01;点赞❤ 收藏❤ 目录 什么是函数递归递归的基本组成递归的工作原理递归的优缺点递归的经典案例 5.1 阶乘计算5.2 斐波那契数列5.3 汉诺塔问题5.4 二分查找 递归的高级…...

【Zookeeper】三,Zookeeper的安装与基本操作

文章目录 安装Zookeeper下载解压解压后的目录结构运行Zookeeper 基本操作 安装Zookeeper 下载 官网下载Zookeeper&#xff0c;会得到一个tar包&#xff0c;如&#xff1a;apache-zookeeper-3.8.4-bin.tar.gz 解压 tar -xvf apache-zookeeper-3.8.4-bin.tar.gz -C /usr/loca…...

STL算法之数值算法<stl_numeric.h>

这一节介绍的算法&#xff0c;统称为数值(numeric)算法。STL规定&#xff0c;欲使用它们&#xff0c;客户端必须包含头文件<numeric>.SGI将它们实现与<stl_numeric.h>文件中。 目录 运用实例 accumulate adjacent_difference inner_product partial_sum pow…...

Git 入门超简单指南

1. 什么是 Git&#xff1f; Git 是一个分布式版本控制系统&#xff0c;由 Linus Torvalds 于 2005 年创建。它的主要目的是帮助开发者有效地管理和跟踪项目的历史版本。通过使用 Git&#xff0c;你可以轻松地记录每一次代码的修改&#xff0c;回滚到以前的版本&#xff0c;以及…...