当前位置: 首页 > news >正文

【Flink-scala】DataStream编程模型之延迟数据处理

DataStream API编程模型

1.【Flink-Scala】DataStream编程模型之数据源、数据转换、数据输出
2.【Flink-scala】DataStream编程模型之 窗口的划分-时间概念-窗口计算程序
3.【Flink-scala】DataStream编程模型之水位线
4.【Flink-scala】DataStream编程模型之窗口计算-触发器-驱逐器


文章目录

  • DataStream API编程模型
  • 一、延迟数据处理
    • 1.1 侧输出
    • 1.2代码实例
      • 1.2.1 代码运行结果
      • 1.2.2 结果分析
    • 1.3代码运行展示
    • 1.4 题外话


一、延迟数据处理

前一小节已经讲了水位线的相关概念,默认情况下,当水位线超过窗口结束时间之后,再有之前的数据到达时,这些数据会被删除。

为了避免有些迟到的数据被删除,因此产生了allowedLateness的概念。

allowedLateness就是针对事件时间而言,对于水位线超过窗口结束时间之后,还允许有一段时间(也是以事件时间来衡量)来等待之前的数据到达,以便再次处理这些数据。

对于窗口计算而言,如果没有设置allowedLateness,窗口触发计算以后就会被销毁

设置了allowedLateness以后,只有水位线大于“窗口结束时间+allowedLateness”时,窗口才会被销毁。

当没有指定allowedLateness,默认值为0.

1.1 侧输出

通常情况下,用户虽然希望对迟到的数据进行窗口计算,但并不想将结果混入正常的计算流程中,而是想将延迟数据和结果保存到数据库中,便于后期对延时数据进行分析。对这种情况,就需要借助于“侧输出”(Side Output)来处理

用sideOutputLateData(OutputTag)来标记迟到数据计算的结果,然后再使用getSideOutput(lateOutputTag)从窗口中获取lateOutputTag标签对应的数据,之后转成独立的DataStream数据集进行处理

1.2代码实例

如下:

import java.text.SimpleDateFormat
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, TimestampAssigner, TimestampAssignerSupplier, Watermark, WatermarkGenerator, WatermarkGeneratorSupplier, WatermarkOutput, WatermarkStrategy}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Timecase class StockPrice(stockId:String,timeStamp:Long,price:Double)object AllowedLatenessTest {def main(args: Array[String]): Unit = {//设定执行环境val env = StreamExecutionEnvironment.getExecutionEnvironment//设定时间特性为事件时间env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)//设定程序并行度env.setParallelism(1)//创建数据源val source = env.socketTextStream("localhost", 9999)//指定针对数据流的转换操作逻辑val stockDataStream = source.map(s => s.split(",")).map(s=>StockPrice(s(0).toString,s(1).toLong,s(2).toDouble))//为数据流分配时间戳和水位线val watermarkDataStream = stockDataStream.assignTimestampsAndWatermarks(new MyWatermarkStrategy)//执行窗口计算val lateData = new OutputTag[StockPrice]("late")val sumStream = watermarkDataStream.keyBy("stockId").window(TumblingEventTimeWindows.of(Time.seconds(3))).allowedLateness(Time.seconds(2L))//注意这里.sideOutputLateData(lateData).reduce((s1, s2) => StockPrice(s1.stockId,s1.timeStamp, s1.price + s2.price))//打印输出sumStream.print("window计算结果:")val late = sumStream.getSideOutput(lateData)late.print("迟到的数据:")//指定名称并触发流计算env.execute("AllowedLatenessTest")}//指定水位线生成策略class MyWatermarkStrategy extends WatermarkStrategy[StockPrice] {override def createTimestampAssigner(context:TimestampAssignerSupplier.Context):TimestampAssigner[StockPrice]={new SerializableTimestampAssigner[StockPrice] {override def extractTimestamp(element: StockPrice, recordTimestamp: Long): Long = {element.timeStamp //从到达消息中提取时间戳}}}override def createWatermarkGenerator(context:WatermarkGeneratorSupplier.Context): WatermarkGenerator[StockPrice] ={new WatermarkGenerator[StockPrice](){val maxOutOfOrderness = 10000L //设定最大延迟为10秒var currentMaxTimestamp: Long = 0Lvar a: Watermark = nullval format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")override def onEvent(element: StockPrice, eventTimestamp: Long, output:WatermarkOutput): Unit = {currentMaxTimestamp = Math.max(eventTimestamp, currentMaxTimestamp)a = new Watermark(currentMaxTimestamp - maxOutOfOrderness)output.emitWatermark(a)println("timestamp:" + element.stockId + "," + element.timeStamp + "|" + format.format(element.timeStamp) + "," + currentMaxTimestamp + "|" + format.format(currentMaxTimestamp) + "," + a.toString)}
override def onPeriodicEmit(output:WatermarkOutput): Unit = {// 没有使用周期性发送水印,因此这里没有执行任何操作}}}}
}

先注意这里:

  val watermarkDataStream = stockDataStream.assignTimestampsAndWatermarks(new MyWatermarkStrategy)//执行窗口计算val lateData = new OutputTag[StockPrice]("late")val sumStream = watermarkDataStream.keyBy("stockId").window(TumblingEventTimeWindows.of(Time.seconds(3))).allowedLateness(Time.seconds(2L))   //注意这里.sideOutputLateData(lateData).reduce((s1, s2) => StockPrice(s1.stockId,s1.timeStamp, s1.price + s2.price))

允许元素延迟到达最多 2 秒。即,如果一个元素的时间戳在窗口结束后的 2 秒内到达,它仍然会被包含在窗口的计算中。

 .allowedLateness(Time.seconds(2L))  设置时间为2s。

其余的就是生成水位线和时间戳了,这就不用解释啦。

1.2.1 代码运行结果

1.在nc端输入
stock_1,1602031567000,8.14
stock_1,1602031571000,8.23
stock_1,1602031577000,8.24
stock_1,1602031578000,8.87
stock_1,1602031579000,8.55
stock_1,1602031577000,8.24
stock_1,1602031581000,8.43
stock_1,1602031582000,8.78
stock_1,1602031581000,8.76
stock_1,1602031579000,8.55
stock_1,1602031591000,8.13
stock_1,1602031581000,8.34
stock_1,1602031580000,8.45
stock_1,1602031579000,8.33
stock_1,1602031578000,8.56
stock_1,1602031577000,8.32
然后启动程序

在这里插入图片描述
分析:水位线生成策略:

每次收到一个新的事件,都会比较当前事件的时间戳和之前记录的最大时间戳,更新 currentMaxTimestamp。
水位线被设定为 currentMaxTimestamp - maxOutOfOrderness,即允许最大 10 秒的延迟

再来看迟到数据:

输入数据是:
stock_1,1602031567000,8.14
stock_1,1602031571000,8.23
stock_1,1602031577000,8.24
stock_1,1602031578000,8.87
stock_1,1602031579000,8.55
stock_1,1602031577000,8.24
stock_1,1602031581000,8.43
stock_1,1602031582000,8.78
stock_1,1602031581000,8.76
stock_1,1602031579000,8.55
stock_1,1602031591000,8.13
stock_1,1602031581000,8.34
stock_1,1602031580000,8.45
stock_1,1602031579000,8.33
stock_1,1602031578000,8.56
stock_1,1602031577000,8.32

数据超出窗口结算数据后2s到达算迟到,任何时间戳比水位线晚 10 秒的事件都会被视为迟到。
之前我看书上写的有点疑问,然后拥有环境的网站也有点问题,我就自己本地了,windows 下载了个netcat,运行结果如下:
在这里插入图片描述

timestamp:stock_1,1602031567000|2020-10-07 08:46:07.000,1602031567000|2020-10-07 08:46:07.000,Watermark @ 1602031557000 (2020-10-07 08:45:57.000)
timestamp:stock_1,1602031571000|2020-10-07 08:46:11.000,1602031571000|2020-10-07 08:46:11.000,Watermark @ 1602031561000 (2020-10-07 08:46:01.000)
timestamp:stock_1,1602031577000|2020-10-07 08:46:17.000,1602031577000|2020-10-07 08:46:17.000,Watermark @ 1602031567000 (2020-10-07 08:46:07.000)
timestamp:stock_1,1602031578000|2020-10-07 08:46:18.000,1602031578000|2020-10-07 08:46:18.000,Watermark @ 1602031568000 (2020-10-07 08:46:08.000)
timestamp:stock_1,1602031579000|2020-10-07 08:46:19.000,1602031579000|2020-10-07 08:46:19.000,Watermark @ 1602031569000 (2020-10-07 08:46:09.000)
timestamp:stock_1,1602031577000|2020-10-07 08:46:17.000,1602031579000|2020-10-07 08:46:19.000,Watermark @ 1602031569000 (2020-10-07 08:46:09.000)
timestamp:stock_1,1602031581000|2020-10-07 08:46:21.000,1602031581000|2020-10-07 08:46:21.000,Watermark @ 1602031571000 (2020-10-07 08:46:11.000)
timestamp:stock_1,1602031582000|2020-10-07 08:46:22.000,1602031582000|2020-10-07 08:46:22.000,Watermark @ 1602031572000 (2020-10-07 08:46:12.000)
timestamp:stock_1,1602031581000|2020-10-07 08:46:21.000,1602031582000|2020-10-07 08:46:22.000,Watermark @ 1602031572000 (2020-10-07 08:46:12.000)
timestamp:stock_1,1602031579000|2020-10-07 08:46:19.000,1602031582000|2020-10-07 08:46:22.000,Watermark @ 1602031572000 (2020-10-07 08:46:12.000)
timestamp:stock_1,1602031591000|2020-10-07 08:46:31.000,1602031591000|2020-10-07 08:46:31.000,Watermark @ 1602031581000 (2020-10-07 08:46:21.000)
timestamp:stock_1,1602031581000|2020-10-07 08:46:21.000,1602031591000|2020-10-07 08:46:31.000,Watermark @ 1602031581000 (2020-10-07 08:46:21.000)
timestamp:stock_1,1602031580000|2020-10-07 08:46:20.000,1602031591000|2020-10-07 08:46:31.000,Watermark @ 1602031581000 (2020-10-07 08:46:21.000)
timestamp:stock_1,1602031579000|2020-10-07 08:46:19.000,1602031591000|2020-10-07 08:46:31.000,Watermark @ 1602031581000 (2020-10-07 08:46:21.000)
timestamp:stock_1,1602031578000|2020-10-07 08:46:18.000,1602031591000|2020-10-07 08:46:31.000,Watermark @ 1602031581000 (2020-10-07 08:46:21.000)
window计算结果:> StockPrice(stock_1,1602031567000,8.14)
window计算结果:> StockPrice(stock_1,1602031571000,8.23)
window计算结果:> StockPrice(stock_1,1602031577000,16.48)
window计算结果:> StockPrice(stock_1,1602031578000,25.970000000000002)
window计算结果:> StockPrice(stock_1,1602031578000,34.42)
window计算结果:> StockPrice(stock_1,1602031578000,42.75)
window计算结果:> StockPrice(stock_1,1602031578000,51.31)
timestamp:stock_1,1602031577000|2020-10-07 08:46:17.000,1602031591000|2020-10-07 08:46:31.000,Watermark @ 1602031581000 (2020-10-07 08:46:21.000)
迟到的数据:> StockPrice(stock_1,1602031577000,8.32)

这里是windows运行结果,可能稍微和ubuntu不一样

1.2.2 结果分析

  • stock_1,1602031567000,8.14
    水位线 557
    窗口时间**[567,570)**
    当前事件:stock_1,1602031567000,8.14
window计算结果:> StockPrice(stock_1,1602031567000,8.14)
  • stock_1,1602031571000,8.23
    水位线: 561
    窗口时间:[571,573)
    当前窗口事件 stock_1,1602031571000,8.23
window计算结果:> StockPrice(stock_1,1602031571000,8.23)
  • stock_1,1602031577000,8.24
    水位线:567
    窗口时间:[577,580)
    当前窗口事件:stock_1,1602031577000,8.24
    8.24+8.23
window计算结果:> StockPrice(stock_1,1602031577000,16.48)
  • stock_1,1602031578000,8.87
    水位线:568
    窗口时间:和上一个相同 [577,580)
    当前窗口事件:stock_1,1602031578000,8.87
    stock_1,1602031577000,8.24
    16.48+8.87
window计算结果:> StockPrice(stock_1,1602031578000,25.970000000000002)
  • stock_1,1602031579000,8.55
    水位线:569
    窗口时间:和上一个相同 [577,580)
    当前窗口事件(3个)
    stock_1,1602031578000,8.87
    stock_1,1602031577000,8.24

    stock_1,1602031579000,8.55
    25.97+8.55
window计算结果:> StockPrice(stock_1,1602031578000,34.42)
  • stock_1,1602031577000,8.24
    水位线:这里-10s为567,最大水位线为569,那么最大水位线569
    窗口时间: 属于[567,570) 没有迟到
    当前窗口事件:
    stock_1,1602031567000,8.14
    stock_1,1602031577000,8.24
    34.42+8.24
window计算结果:> StockPrice(stock_1,1602031578000,42.75)
  • stock_1,1602031581000,8.43
    水位线:571
    窗口时间:[581,584)
    当前窗口事件 stock_1,1602031581000,8.43
    注意 571水位线,第一个事件事件窗口时间[567,570),这个就应该结束计算了、
window计算结果:> StockPrice(stock_1,1602031578000,51.31)
之后就没了
为什么没了呢,窗口还没计算完呢,一直在监听啊。
水位线也不涨了
  • stock_1,1602031582000,8.78
    水位线:572
    窗口时间:[581,584)
    当前窗口事件:
    stock_1,1602031581000,8.43
    stock_1,1602031582000,8.78

  • stock_1,1602031581000,8.76
    水位线:572
    窗口时间:[581,584)
    当前窗口事件:
    stock_1,1602031581000,8.43
    stock_1,1602031582000,8.78

    stock_1,1602031581000,8.76

  • stock_1,1602031579000,8.55
    水位线:572
    窗口时间:[577,580)
    当前窗口事件:(4个)
    stock_1,1602031578000,8.87
    stock_1,1602031577000,8.24
    stock_1,1602031579000,8.55

    stock_1,1602031579000,8.55

  • stock_1,1602031591000,8.13
    水位线:581
    窗口时间:[591,594)
    当前窗口事件:
    stock_1,1602031591000,8.13

  • stock_1,1602031581000,8.34
    -水位线:581
    窗口时间:[581,584)
    (此时:[577,580)就应该结束了)
    窗口事件:
    stock_1,1602031581000,8.43
    stock_1,1602031582000,8.78
    stock_1,1602031581000,8.76

    stock_1,1602031581000,8.34

  • stock_1,1602031580000,8.45
    水位线:581
    窗口时间 [580,583)
    窗口事件:stock_1,1602031580000,8.45

  • stock_1,1602031579000,8.33
    水位线:581
    窗口时间:[577,580) 水位线在581的时候就结束了,但它在 allowedLateness 的 2 秒内到达,因此它 不会被视为迟到数据。

  • stock_1,1602031578000,8.56
    水位线:581
    窗口时间属于 [577,580)
    判断水位线和时间戳的关系:水位线超过窗口结束时间
    事件是否迟到:允许超出2s,正好就在580的边界,根据 Flink 的延迟数据规则,任何事件的时间戳如果大于当前窗口结束时间,并且超出了 allowedLateness(即 2 秒),就会被视为迟到数据。这个事件 stock_1,1602031578000,8.56 的时间戳刚好等于窗口 [577, 580) 的结束时间,并且它的时间戳在当前窗口结束后刚好到达,所以 它不会被视为迟到数据。

找了chatgpt问了一下:

这个事件不算迟到数据。尽管它的时间戳正好是窗口结束的边界,但它仍然属于这个窗口 [577, 580) 的一部分,不超出窗口时间范围,也没有超过 allowedLateness 设置的 2 秒,所以它被处理为正常数据。
为什么不算迟到?
Flink 的窗口操作是基于事件时间戳的,窗口 [577, 580) 的结束时间是 1602031578000。当水位线达到 581 时,窗口已经结束,但如果事件的时间戳恰好是窗口的结束时间,它依然被认为是属于该窗口的。
迟到数据的定义:迟到数据是指事件的时间戳超过了当前窗口结束时间,并且超出了允许的迟到时间。在这种情况下,事件时间戳与窗口结束时间对齐,因此并不算迟到。

  • stock_1,1602031577000,8.32
    水位线 581
    窗口时间:[577,580)
    577+2<581
    当前事件的时间戳 1602031577000 恰好等于窗口的起始时间 577,但水位线已经推进到了 581。
    窗口已经结束:水位线达到 581 时,窗口 [577, 580) 的数据已经完全处理完毕,因此任何时间戳在 580 之后到达的事件,都被视为迟到数据。

  • 说明:其中的删除线,我是手动分析的他们如何迟到,但是后来代码发现,数据是一个个来的,你没有超出水位线,那么我就立马把你这条数据计算了,因此输出结果就是累加起来的。

1.3代码运行展示

本来使用的是头歌平台提供的环境,但是环境有问题了,自己手动搭建了一下。幸亏是socket,windows还能实现监听自己,就把代码放在这里啦!
pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>Flink_scala2.13</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><flink.version>1.11.0</flink.version> <!-- 使用 Flink 1.15.0 --><target.java.version>1.8</target.java.version><scala.binary.version>2.12</scala.binary.version><scala.version>2.12.16</scala.version></properties><dependencies><!-- 引入 Flink 相关依赖--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-scala_${scala.binary.version}</artifactId><version>${flink.version}</version><!--			<scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_${scala.binary.version}</artifactId><version>${flink.version}</version><!--			<scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.binary.version}</artifactId><version>${flink.version}</version><!--			<scope>provided</scope>--></dependency></dependencies>
</project>

test1.scala (AllowedLatenessTest函数)


/*
stock_1,1602031567000,8.14
stock_1,1602031571000,8.23
stock_1,1602031577000,8.24
stock_1,1602031578000,8.87
stock_1,1602031579000,8.55
stock_1,1602031577000,8.24
stock_1,1602031581000,8.43
stock_1,1602031582000,8.78
stock_1,1602031581000,8.76
stock_1,1602031579000,8.55
stock_1,1602031591000,8.13
stock_1,1602031581000,8.34
stock_1,1602031580000,8.45
stock_1,1602031579000,8.33
stock_1,1602031578000,8.56
stock_1,1602031577000,8.32*/
import java.text.SimpleDateFormat
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, TimestampAssigner, TimestampAssignerSupplier, Watermark, WatermarkGenerator, WatermarkGeneratorSupplier, WatermarkOutput, WatermarkStrategy}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
case class StockPrice(stockId:String,timeStamp:Long,price:Double)object test1 {def main(args: Array[String]): Unit = {// *************************** Begin ****************************//设定执行环境val env = StreamExecutionEnvironment.getExecutionEnvironment//设定时间特性为事件时间env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)//设定程序并行度env.setParallelism(1)//创建数据源val source = env.socketTextStream("localhost", 9999)//指定针对数据流的转换操作逻辑val stockDataStream = source.map(s => s.split(",")).map(s=>StockPrice(s(0).toString,s(1).toLong,s(2).toDouble))//为数据流分配时间戳和水位线val watermarkDataStream = stockDataStream.assignTimestampsAndWatermarks(new MyWatermarkStrategy)//执行窗口计算val lateData = new OutputTag[StockPrice]("late")val sumStream = watermarkDataStream.keyBy("stockId").window(TumblingEventTimeWindows.of(Time.seconds(3))).allowedLateness(Time.seconds(2L)).sideOutputLateData(lateData).reduce((s1, s2) => StockPrice(s1.stockId,s1.timeStamp, s1.price + s2.price))// **************************** End *****************************//打印输出sumStream.print("window计算结果:")val late = sumStream.getSideOutput(lateData)late.print("迟到的数据:")//指定名称并触发流计算env.execute("AllowedLatenessTest")}//指定水位线生成策略class MyWatermarkStrategy extends WatermarkStrategy[StockPrice] {override def createTimestampAssigner(context:TimestampAssignerSupplier.Context):TimestampAssigner[StockPrice]={new SerializableTimestampAssigner[StockPrice] {override def extractTimestamp(element: StockPrice, recordTimestamp: Long): Long = {element.timeStamp //从到达消息中提取时间戳}}}override def createWatermarkGenerator(context:WatermarkGeneratorSupplier.Context): WatermarkGenerator[StockPrice] ={new WatermarkGenerator[StockPrice](){val maxOutOfOrderness = 10000L //设定最大延迟为10秒var currentMaxTimestamp: Long = 0Lvar a: Watermark = nullval format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")override def onEvent(element: StockPrice, eventTimestamp: Long, output:WatermarkOutput): Unit = {currentMaxTimestamp = Math.max(eventTimestamp, currentMaxTimestamp)a = new Watermark(currentMaxTimestamp - maxOutOfOrderness)output.emitWatermark(a)println("timestamp:" + element.stockId + "," + element.timeStamp + "|" + format.format(element.timeStamp) + "," + currentMaxTimestamp + "|" + format.format(currentMaxTimestamp) + "," + a.toString)}override def onPeriodicEmit(output:WatermarkOutput): Unit = {// 没有使用周期性发送水印,因此这里没有执行任何操作}}}}
}

这个是头歌提供的pom.xml,二者应该都能使用,但是你需要下载flink和scala,这不再多说,版本还要匹配哦

<project><groupId>cn.edu.xmu.dblab</groupId><artifactId>wordcount_myID</artifactId><modelVersion>4.0.0</modelVersion><name>WordCount</name><packaging>jar</packaging><version>1.0</version><repositories><repository><id>alimaven</id><name>aliyun maven</name><url>http://maven.aliyun.com/nexus/content/groups/public/</url></repository></repositories><properties><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><encoding>UTF-8</encoding><scala.version>2.12.2</scala.version><scala.binary.version>2.12</scala.binary.version><hadoop.version>2.7.7</hadoop.version><flink.version>1.11.2</flink.version></properties><dependencies><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>${scala.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-scala_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>${hadoop.version}</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.38</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.22</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-redis_2.11</artifactId><version>1.1.5</version></dependency><dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>2.8.0</version><scope>compile</scope></dependency></dependencies>
<build><plugins><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.2.0</version><executions><execution><goals><goal>compile</goal></goals></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>3.0.0</version><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build>
</project>

1.4 题外话

我使用的课本是厦门大学林子雨老师编写的《Flink编程基础》,当前我用的书是2021年9月第一版,第5.6章节的延迟数据处理中(201页),最后迟到的数据8.43写错了,应该是:8.32。正是书上写错了,我才有点疑问发现此处的问题。

相关文章:

【Flink-scala】DataStream编程模型之延迟数据处理

DataStream API编程模型 1.【Flink-Scala】DataStream编程模型之数据源、数据转换、数据输出 2.【Flink-scala】DataStream编程模型之 窗口的划分-时间概念-窗口计算程序 3.【Flink-scala】DataStream编程模型之水位线 4.【Flink-scala】DataStream编程模型之窗口计算-触发器-…...

samout llm解码 幻觉更低更稳定

这段代码定义了一个简单的对话生成系统&#xff0c;包括模型加载、词汇表加载、以及基于给定提示生成文本的功能。下面是对代码的解析&#xff1a; load_model_and_voc(device"cpu"): 该函数用于加载预训练的模型和词汇表&#xff08;vocabulary&#xff09;。它首先…...

python学opencv|读取图像(十六)修改HSV图像HSV值

【1】引言 前序学习进程中&#xff0c;我们已经掌握了对HSV通道和BGR通道的拆分和合并&#xff0c;并通过自由组合的形式&#xff0c;获得了和初始图像完全不一样的新图像&#xff0c;相关文章可以参考下述链接&#xff1a; python学opencv|读取图像&#xff08;十四&#xf…...

nginx自定义错误页面

一、Nginx 自定义错误页面笔记 error_page 指令 语法&#xff1a;error_page error_code [|answer-code] uri;作用&#xff1a;用于定义当特定的 HTTP 错误发生时&#xff0c;Nginx 应该返回给客户端的自定义页面。error_code 是需要自定义页面的 HTTP 错误码&#xff0c;例如 …...

通过枚举值调用函数

在做业务的时候&#xff0c;需要根据前端传递的不同枚举参数&#xff08;比如说0&#xff0c;1&#xff09;返回对应固定的值。但是这个值需要根据时间又有所变化。我们可以使用if-else去实现对应的逻辑&#xff0c;比如说&#xff0c;当前端传递参数为0是&#xff0c;需要返回…...

[手机Linux] 六,ubuntu18.04私有网盘(NextCloud)安装

一&#xff0c;LNMP介绍 LNMP一键安装包是一个用Linux Shell编写的可以为CentOS/RHEL/Fedora/Debian/Ubuntu/Raspbian/Deepin/Alibaba/Amazon/Mint/Oracle/Rocky/Alma/Kali/UOS/银河麒麟/openEuler/Anolis OS Linux VPS或独立主机安装LNMP(Nginx/MySQL/PHP)、LNMPA(Nginx/MySQ…...

powershell(1)

免责声明 学习视频来自 B 站up主泷羽sec&#xff0c;如涉及侵权马上删除文章。 笔记的只是方便各位师傅学习知识&#xff0c;以下代码、网站只涉及学习内容&#xff0c;其他的都与本人无关&#xff0c;切莫逾越法律红线&#xff0c;否则后果自负。 泷羽sec官网&#xff1a;http…...

Message Processing With Spring Integration高级应用:自定义消息通道与端点

一、Spring Integration 简介 Spring Integration 是 Spring 框架的扩展&#xff0c;支持企业集成模式&#xff08;EIP&#xff09;&#xff0c;提供轻量级的消息处理功能&#xff0c;帮助开发者构建可维护、可测试的企业集成解决方案。 核心目标&#xff1a; 提供简单的模型…...

CUDA从入门到精通(六)——CUDA编程模型(二)

1. 核函数类型限定符 CUDA 核函数的常用函数类型限定符及其相关信息的表格&#xff1a; 限定符执行端调用方式备注__global__设备端&#xff08;GPU&#xff09;从主机代码使用 <<<...>>> 调用核函数用于声明核函数&#xff0c;在 GPU 上执行。只能从主机代…...

*【每日一题 基础题】 [蓝桥杯 2023 省 B] 飞机降落

题目描述 N 架飞机准备降落到某个只有一条跑道的机场。其中第 i 架飞机在 Ti 时刻到达机场上空&#xff0c;到达时它的剩余油料还可以继续盘旋 Di 个单位时间&#xff0c;即它最早可以于 Ti 时刻开始降落&#xff0c;最晚可以于 Ti Di 时刻开始降落。降落过程需要 Li个单位时间…...

作业Day4: 链表函数封装 ; 思维导图

目录 作业&#xff1a;实现链表剩下的操作&#xff1a; 任意位置删除 按位置修改 按值查找返回地址 反转 销毁 运行结果 思维导图 作业&#xff1a;实现链表剩下的操作&#xff1a; 1>任意位置删除 2>按位置修改 3>按值查找返回地址 4>反转 5>销毁 任意…...

线性规划中的几种逻辑表达式

线性规划中的几种逻辑表达式 注意&#xff1a; 摘录字刘博士的《数学建模与数学规划》&#xff0c; 以便用时可查。 实际上Gurobi API 中自身放啊变的逻辑表达式函数&#xff0c;下面列出自定义的实现方式。 1 逻辑与 如果 x 1 1 x_1 1 x1​1, x 2 1 x_2 1 x2​1, 那…...

NX二次开发通过内部函数获取面的面积MW_face_ask_area

获取动态库libmold.dll的路径 void TcharToChar(const TCHAR* tchar, char* _char) {int iLength; #if UNICODE//获取字节长度 iLength WideCharToMultiByte(CP_ACP, 0, tchar, -1, NULL, 0, NULL, NULL);//将tchar值赋给_char WideCharToMultiByte(CP_ACP, 0, tchar, -…...

初学stm32 ——— 串口通信

目录 STM32的串口通信接口 UART异步通信方式特点&#xff1a; 串口通信过程 STM32串口异步通信需要定义的参数: USART框图&#xff1a; 常用的串口相关寄存器 串口操作相关库函数 ​编辑 串口配置的一般步骤 STM32的串口通信接口 UART&#xff1a;通用异步收发器USART&am…...

分割双声道音频-Audacity和ffmpeg

双声道音频资源&#xff1a; https://download.csdn.net/download/yudelian/90135217 1、ffmpeg分割双声道音频 ffmpeg -i input.wav -map_channel 0.0.0 left.wav -map_channel 0.0.1 right.wav 2、audacity分割双生音频并且播放 选择分离立体声轨 可以看出分离出了两个音频…...

在 Spring Boot 3 中实现基于角色的访问控制

基于角色的访问控制 (RBAC) 是一种有价值的访问控制模型,可增强安全性、简化访问管理并提高效率。它在管理资源访问对安全和运营至关重要的复杂环境中尤其有益。 我们将做什么 我们有一个包含公共路由和受限路由的 Web API。受限路由需要数据库中用户的有效 JWT。 现在用户…...

MySQL追梦旅途之慢查询分析建议

一、找到慢查询 查询是否开启慢查询记录 show variables like "%slow%";log_slow_admin_statements&#xff1a; 决定是否将慢管理语句&#xff08;如 ALTER TABLE 等&#xff09;记录到慢查询日志中。 log_slow_extra &#xff1a; MySQL 和 MariaDB 中的一个系…...

电子应用设计方案-60:智能床垫系统方案设计

智能床垫系统方案设计 一、引言 智能床垫作为智能家居的一部分&#xff0c;旨在为用户提供更舒适的睡眠体验和健康监测功能。本方案将详细描述智能床垫系统的设计理念、功能模块及技术实现。 二、系统概述 1. 系统目标 - 实时监测睡眠状态&#xff0c;包括心率、呼吸、体动等…...

聊聊航空航天软件中常用的SIFT(Software-Implemented Fault Tolerance)三版本方案

一、SIFT技术 在软件程序控制流程中&#xff0c;特别是在SIFT&#xff08;Software-Implemented Fault Tolerance&#xff09;系统中使用三版本编程&#xff08;Three-Version Programming, 3VP&#xff09;意味着为同一个任务创建三个独立的软件版本。每个版本由不同的开发团…...

智能座舱进阶-应用框架层-Jetpack主要组件

Jetpack的分类 1. DataBinding&#xff1a;以声明方式将可观察数据绑定到界面元素&#xff0c;通常和ViewModel配合使用。 2. Lifecycle&#xff1a;用于管理Activity和Fragment的生命周期&#xff0c;可帮助开发者生成更易于维护的轻量级代码。 3. LiveData: 在底层数据库更…...

2024年底-Sre面试回顾

前言 背景: 2024.11月底 公司不大行了, 裁员收缩, 12月初开始面试, 2周大概面试了十几家公司, 3个2面要去线下, 有1个还不错的offer, 想结束战斗但还没到时候 个人情况: base上海 5年经验(2年实施3年运维半年开发) 面试岗位: Sre、云原生运维、驻场运维、高级运维、实施交付 …...

vue2使用render,js中写html

1、js部分table.js export default {name: "dadeT",data() {return {dades: 6666};},render(h) {return h(div, [h(span, 组件数据&#xff1a;${this.dades}), // 利用data里的dades数据&#xff0c;展示在页面上h(span, 89855545)]);} };2、vue部分 <templat…...

L2tp环境搭建笔记- Openwrt平台

L2tp环境搭建笔记- Openwrt平台 安装L2tp服务配置L2tp serverL2TP客户端配置(使用配置文件)L2TP客户端配置(LUCI)客户端 拔号(命令行方式)defaultroute路由问题L2TP(Layer 2 Tunneling Protocol)是一种工作在二层的隧道协议,是一种虚拟专用网络(VPN)协议。L2TP通常基…...

解决Nginx + Vue.js (ruoyi-vue) 单页应用(SPA) 404问题的指南

问题描述 在使用Vue.js构建的单页应用&#xff08;SPA&#xff09;中&#xff0c;特别是像ruoyi-vue这样的框架&#xff0c;如果启用了HTML5历史记录模式进行路由管理&#xff0c;那么用户直接访问子路径或刷新页面时可能会遇到404错误。这是因为当用户尝试访问一个非根路径时…...

Leetcode打卡:找到稳定山的下标

执行结果&#xff1a;通过 题目&#xff1a; 3258 找到稳定山的下标 有 n 座山排成一列&#xff0c;每座山都有一个高度。给你一个整数数组 height &#xff0c;其中 height[i] 表示第 i 座山的高度&#xff0c;再给你一个整数 threshold 。 对于下标不为 0 的一座山&#xf…...

51c嵌入式~单片机~合集3

我自己的原文哦~ https://blog.51cto.com/whaosoft/12362395 一、STM32代码远程升级之IAP编程 IAP是什么 有时项目上需要远程升级单片机程序&#xff0c;此时需要接触到IAP编程。 IAP即为In Application Programming&#xff0c;解释为在应用中编程&#xff0c;用户自己的…...

基于vue3实现小程序手机号一键登录

在Vue 3中实现小程序手机号一键登录&#xff0c;你需要结合小程序的API和Vue 3的框架特性。以下是一个基本的实现步骤和示例代码&#xff1a; 步骤 创建Vue 3项目&#xff1a;如果你还没有Vue 3项目&#xff0c;你需要先创建一个。这可以通过Vue CLI或者其他方式来完成。 集成…...

车辆重识别代码笔记12.19

1、resnet_ibn_a和resnet网络的区别 ResNet-IBN-A 是在 ResNet 基础上进行了一些改进的变种&#xff0c;具体来说&#xff0c;它引入了 Instance Batch Normalization (IBN) 的概念&#xff0c;这在某些任务中&#xff08;如图像识别、迁移学习等&#xff09;有显著的性能提升。…...

c语言---预处理

预处理的概念 预处理是C语言编译过程的第一个阶段。在这个阶段&#xff0c;预处理器会根据预处理指令对源程序进行处理&#xff0c;这些指令以#开头&#xff0c;比如#include、#define等。预处理的主要目的是对源程序进行文本替换和文件包含等操作&#xff0c;为后续的编译步骤…...

Spring Cloud Sleuth 分布式链路追踪入门

您好&#xff0c;我是今夜写代码,今天学习下分布式链路组件Spring Cloud Sleuth。 本文内容 介绍了分布式链路的思想 Sleuth 和 Zipkin 简单集成Demo,并不涉及 Sleuth原理。 为什么要用链路追踪&#xff1f; 微服务架构下&#xff0c;一个复杂的电商应用&#xff0c;完成下…...

无人机航测系统技术特点!

一、无人机航测系统的设计逻辑 无人机航测系统的设计逻辑主要围绕实现高效、准确、安全的航空摄影测量展开。其设计目标是通过无人机搭载相机和传感器&#xff0c;利用先进的飞行控制系统和数据处理技术&#xff0c;实现对地表信息的全方位、高精度获取。 需求分析&#xff1…...

uniapp使用腾讯地图接口的时候提示此key每秒请求量已达到上限或者提示此key每日调用量已达到上限问题解决

要在创建的key上添加配额 点击配额之后进入分配页面&#xff0c;分配完之后刷新uniapp就可以调用成功了。...

【Prompt Engineering】3.文本概括

一、引言 文本信息量大&#xff0c;LLM在文本概括任务上展现出强大能力。本章介绍如何通过编程方式调用API接口实现文本概括功能。 首先&#xff0c;我们需要引入 zhipuAI 包&#xff0c;加载 API 密钥&#xff0c;定义 getCompletion 函数。 from zhipuai import ZhipuAIke…...

5G 模组 初始化状态检测

5G 模组 上电检测 5G 模组 上电检测 #终端上电后&#xff0c;待模组正常启动&#xff0c;再进入 控制台。 #vim /etc/profile##新增 until [ -c /dev/ttyUSB1 ] doecho -e "Wait module[5G] up ... "sleep 5 done ##新增The End....

常用的前端框架介绍

在前端开发中&#xff0c;有几个常用的框架技术&#xff0c;它们各自具有独特的特点和优势。 1. React&#xff1a; • 组件化开发&#xff1a;React 鼓励将 UI 拆分成可复用的组件&#xff0c;每个组件负责渲染 UI 的一部分。 • 虚拟 DOM&#xff1a;React 使用虚拟 DOM 来提…...

python飞机大战游戏.py

python飞机大战游戏.py import pygame import random# 游戏窗口大小 WINDOW_WIDTH 600 WINDOW_HEIGHT 800# 颜色定义 BLACK (0, 0, 0) WHITE (255, 255, 255)# 初始化Pygame pygame.init()# 创建游戏窗口 window pygame.display.set_mode((WINDOW_WIDTH, WINDOW_HEIGHT))…...

PPO: 一种通过近端策略优化提高模型性能的方法

PPO: 一种通过近端策略优化提高模型性能的方法 PPO&#xff08;Proximal Policy Optimization&#xff09;是一种强化学习中的策略优化算法&#xff0c;主要用于训练智能体以改善在环境中表现的能力。PPO通过以下几个关键点来提高模型性能&#xff1a; 近端优化&#xff1a;PP…...

Docker创建一个mongodb实例,并用springboot连接 mongodb进行读写文件

一、通过Docker 进行运行一个 mongodb实例 1、拉取镜像 docker pull mongo:5.0.5 2、创建 mongodb容器实例 docker run -d --name mongodb2 \-e MONGO_INITDB_ROOT_USERNAMEsalaryMongo \-e MONGO_INITDB_ROOT_PASSWORD123456 \-p 27017:27017 \mongo:5.0.5 3、进入容器&am…...

[IT项目管理]九.项目质量管理

九&#xff0e;项目质量管理 9.1项目质量管理的重要性 对于很多IT项目的差劲&#xff0c;大多数人只可以忍受。项目质量管理是IT项目管理的重要组成部分&#xff0c;对于提高项目成功率、降低项目成本、提升客户满意度至关重要。尽管很多人对IT项目的质量问题感到无奈&#x…...

Unity中的委托和事件(UnityAction、UnityEvent)

委托和事件 &#x1f392;什么是委托&#xff0c;委托的关键字是Delegate&#xff0c;委托是一种函数的容器&#xff0c;运行将函数做为变量来进行传递 通过Delegate关键字我们声明了一个无参无返回的委托&#xff0c;通过这个委托我们可以存储无参无返回的函数 public deleg…...

图像生成工具WebUI

介绍 Stable Diffusion WebUI&#xff08;AUTOMATIC1111&#xff0c;简称A1111&#xff09;是一个为高级用户设计的图形用户界面&#xff08;GUI&#xff09;&#xff0c;它提供了丰富的功能和灵活性&#xff0c;以满足复杂和高级的图像生成需求。如今各种人工智能满天飞&…...

Python面试常见问题及答案12

问题&#xff1a; 请解释Python中的GIL&#xff08;全局解释器锁&#xff09;是什么&#xff1f; ○ 答案&#xff1a; GIL是Python解释器中的一种机制&#xff0c;用于确保任何时候只有一个线程在执行Python字节码。这在多线程场景下可能影响性能优化&#xff0c;但对于单线程…...

javalock(六)CyclicBarrier

注意&#xff1a;CyclicBarrier不是AQS的派生类&#xff0c;而是CyclicBarrier内部使用了ReentrantLock.Condition 和CountDownLatch一样&#xff0c;都是计数减为0就可以成功获取锁 和CountDownLatch不同的是&#xff1a; 1&#xff1a;CountDownLatch的await和countdown操作…...

React 19有哪些新特性?

写在前面 2024.12.5&#xff0c;React 团队在 react.dev/blog 上发表了帖子 react.dev/blog/2024/1… React 19 正式进入了 stable 状态 React 团队介绍了一些新的特性和 Breaking Changes&#xff0c;并提供了升级指南&#xff0c; React 19: 新更新、新特性和新Hooks Reac…...

大数据治理:构建数据驱动的智慧教学体系

随着大数据技术在教育领域的逐渐渗透&#xff0c;大数据治理在教学中的应用日益广泛&#xff0c;它为提升教学质量、优化教学资源配置以及实现个性化教学提供了有力支持。 一、大数据治理在教学数据管理中的应用 在教学过程中&#xff0c;会产生海量的数据&#xff0c;如学生的…...

梳理你的思路(从OOP到架构设计)_浅尝架构师的滋味03

目录 1、分与合&#xff1a; 强龙与地头蛇的分工 分工 & 合作 分工的时间点 客人来之前做「分」&#xff0c;客人来之后做「合」 2、结语 肯德基餐厅 火锅店 汽车 从分工到外包模式 1、分与合&#xff1a; 强龙与地头蛇的分工 EIT造形用来表达架构师的先「分」与买…...

ChatGPT与领域特定语言的集成

用ChatGPT做软件测试 领域特定语言&#xff08;Domain-Specific Language&#xff0c;DSL&#xff09;是一种编程语言&#xff0c;专门设计用于满足特定领域或问题领域的需求。它是一种定制的语言&#xff0c;通常包括特定领域的专业术语以及相应的语法规则。DSL的设计旨在让领…...

sql server msdb数据库备份恢复

备份 BACKUP DATABASE [msdb] TO DISK ND:\liyuanshuai\test\sqlserver_bakfile\msdb20241219.bak WITH NOFORMAT, NOINIT, NAME Nlys-完整 数据库 备份, SKIP, NOREWIND, NOUNLOAD, COMPRESSION, STATS 10 GO然后删除2个测试的job&#xff0c;停止 SQL Server 代理…...

MyBatis(二)

一、MyBatis 和 JDBC 有什么区别&#xff1f; JDBC 是 Java 访问数据库的基础 API&#xff0c;它需要大量的样板代码。比如&#xff0c;使用 JDBC 进行查询时&#xff0c;需要加载驱动、建立连接、创建语句、执行查询、处理结果集和关闭资源等操作。代码比较繁琐且容易出错。M…...

Docker:Dockerfile(补充四)

这里写目录标题 1. Dockerfile常见指令1.1 DockerFile例子 2. 一些其他命令 1. Dockerfile常见指令 简单的dockerFile文件 FROM openjdk:17LABEL authorleifengyangCOPY app.jar /app.jarEXPOSE 8080ENTRYPOINT ["java","-jar","/app.jar"]# 使…...