当前位置: 首页 > news >正文

【Flink系列】9. Flink容错机制

9. 容错机制

在Flink中,有一套完整的容错机制来保证故障后的恢复,其中最重要的就是检查点。

9.1 检查点(Checkpoint)

在这里插入图片描述

9.1.1 检查点的保存

1)周期性的触发保存

“随时存档”确实恢复起来方便,可是需要我们不停地做存档操作。如果每处理一条数据就进行检查点的保存,当大量数据同时到来时,就会耗费很多资源来频繁做检查点,数据处理的速度就会受到影响。所以在Flink中,检查点的保存是周期性触发的,间隔时间可以进行设置。

2)保存的时间点

我们应该在所有任务(算子)都恰好处理完一个相同的输入数据的时候,将它们的状态保存下来。
这样做可以实现一个数据被所有任务(算子)完整地处理完,状态得到了保存。
如果出现故障,我们恢复到之前保存的状态,故障时正在处理的所有数据都需要重新处理;我们只需要让源(source)任务向数据源重新提交偏移量、请求重放数据就可以了。当然这需要源任务可以把偏移量作为算子状态保存下来,而且外部数据源能够重置偏移量;kafka就是满足这些要求的一个最好的例子。

3)保存的具体流程

检查点的保存,最关键的就是要等所有任务将“同一个数据”处理完毕。下面我们通过一个具体的例子,来详细描述一下检查点具体的保存过程。
回忆一下我们最初实现的统计词频的程序——word count。这里为了方便,我们直接从数据源读入已经分开的一个个单词,例如这里输入的是:
“hello”,“world”,“hello”,“flink”,“hello”,“world”,“hello”,“flink”…
我们所需要的就是每个任务都处理完“hello”之后保存自己的状态。

9.1.2 从检查点恢复状态

在这里插入图片描述

9.1.3 检查点算法

在Flink中,采用了基于Chandy-Lamport算法的分布式快照,可以在不暂停整体流处理的前提下,将状态备份保存到检查点。

9.1.3.1 检查点分界线(Barrier)

借鉴水位线的设计,在数据流中插入一个特殊的数据结构,专门用来表示触发检查点保存的时间点。收到保存检查点的指令后,Source任务可以在当前数据流中插入这个结构;之后的所有任务只要遇到它就开始对状态做持久化快照保存。由于数据流是保持顺序依次处理的,因此遇到这个标识就代表之前的数据都处理完了,可以保存一个检查点;而在它之后的数据,引起的状态改变就不会体现在这个检查点中,而需要保存到下一个检查点。
这种特殊的数据形式,把一条流上的数据按照不同的检查点分隔开,所以就叫做检查点的“分界线”(Checkpoint Barrier)。
在这里插入图片描述

9.1.3.2 分布式快照算法(Barrier对齐的精准一次)

watermark指示的是“之前的数据全部到齐了”,而barrier指示的是“之前所有数据的状态更改保存入当前检查点”:它们都是一个“截止时间”的标志。所以在处理多个分区的传递时,也要以是否还会有数据到来作为一个判断标准。
具体实现上,Flink使用了Chandy-Lamport算法的一种变体,被称为“异步分界线快照”算法。算法的核心就是两个原则:
当上游任务向多个并行下游任务发送barrier时,需要广播出去;
而当多个上游任务向同一个下游任务传递分界线时,需要在下游任务执行“分界线对齐”操作,也就是需要等到所有并行分区的barrier都到齐,才可以开始状态的保存。

1)场景说明

在这里插入图片描述

2)检查点保存算法具体过程为:

在这里插入图片描述

(1)触发检查点:JobManager向Source发送Barrier;
(2)Barrier发送:向下游广播发送;
(3)Barrier对齐:下游需要收到上游所有并行度传递过来的Barrier才做自身状态的保存;
(4)状态保存:有状态的算子将状态保存至持久化。
(5)先处理缓存数据,然后正常继续处理
完成检查点保存之后,任务就可以继续正常处理数据了。这时如果有等待分界线对齐时缓存的数据,需要先做处理;然后再按照顺序依次处理新到的数据。当JobManager收到所有任务成功保存状态的信息,就可以确认当前检查点成功保存。之后遇到故障就可以从这里恢复了。
(补充)由于分界线对齐要求先到达的分区做缓存等待,一定程度上会影响处理的速度;当出现背压时,下游任务会堆积大量的缓冲数据,检查点可能需要很久才可以保存完毕。
为了应对这种场景,Barrier对齐中提供了至少一次语义以及Flink 1.11之后提供了不对齐的检查点保存方式,可以将未处理的缓冲数据也保存进检查点。这样,当我们遇到一个分区barrier时就不需等待对齐,而是可以直接启动状态的保存了。

9.1.3.3 分布式快照算法(Barrier对齐的至少一次)

在这里插入图片描述

9.1.3.4 分布式快照算法(非Barrier对齐的精准一次)

在这里插入图片描述

9.1.4 检查点配置

检查点的作用是为了故障恢复,我们不能因为保存检查点占据了大量时间、导致数据处理性能明显降低。为了兼顾容错性和处理性能,我们可以在代码中对检查点进行各种配置。

9.1.4.1 启用检查点

默认情况下,Flink程序是禁用检查点的。如果想要为Flink应用开启自动保存快照的功能,需要在代码中显式地调用执行环境的.enableCheckpointing()方法:

StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();// 每隔1秒启动一次检查点保存
env.enableCheckpointing(1000);

这里需要传入一个长整型的毫秒数,表示周期性保存检查点的间隔时间。如果不传参数直接启用检查点,默认的间隔周期为500毫秒,这种方式已经被弃用。
检查点的间隔时间是对处理性能和故障恢复速度的一个权衡。如果我们希望对性能的影响更小,可以调大间隔时间;而如果希望故障重启后迅速赶上实时的数据处理,就需要将间隔时间设小一些。

9.1.4.2 检查点存储

检查点具体的持久化存储位置,取决于“检查点存储”的设置。默认情况下,检查点存储在JobManager的堆内存中。而对于大状态的持久化保存,Flink也提供了在其他存储位置进行保存的接口。
具体可以通过调用检查点配置的.setCheckpointStorage()来配置,需要传入一个CheckpointStorage的实现类。Flink主要提供了两种CheckpointStorage:作业管理器的堆内存和文件系统。

// 配置存储检查点到JobManager堆内存
env.getCheckpointConfig().setCheckpointStorage(new JobManagerCheckpointStorage());// 配置存储检查点到文件系统
env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("hdfs://namenode:40010/flink/checkpoints"));

对于实际生产应用,我们一般会将CheckpointStorage配置为高可用的分布式文件系统(HDFS,S3等)。

9.1.4.3 其它高级配置

检查点还有很多可以配置的选项,可以通过获取检查点配置(CheckpointConfig)来进行设置。

CheckpointConfig checkpointConfig = env.getCheckpointConfig();
1)常用高级配置
  • 检查点模式(CheckpointingMode)
    设置检查点一致性的保证级别,有“精确一次”(exactly-once)和“至少一次”(at-least-once)两个选项。默认级别为exactly-once,而对于大多数低延迟的流处理程序,at-least-once就够用了,而且处理效率会更高。
  • 超时时间(checkpointTimeout)
    用于指定检查点保存的超时时间,超时没完成就会被丢弃掉。传入一个长整型毫秒数作为参数,表示超时时间。
  • 最小间隔时间(minPauseBetweenCheckpoints)
    用于指定在上一个检查点完成之后,检查点协调器最快等多久可以出发保存下一个检查点的指令。这就意味着即使已经达到了周期触发的时间点,只要距离上一个检查点完成的间隔不够,就依然不能开启下一次检查点的保存。这就为正常处理数据留下了充足的间隙。当指定这个参数时,实际并发为1。
  • 最大并发检查点数量(maxConcurrentCheckpoints)
    用于指定运行中的检查点最多可以有多少个。由于每个任务的处理进度不同,完全可能出现后面的任务还没完成前一个检查点的保存、前面任务已经开始保存下一个检查点了。这个参数就是限制同时进行的最大数量。
  • 开启外部持久化存储(enableExternalizedCheckpoints)
    用于开启检查点的外部持久化,而且默认在作业失败的时候不会自动清理,如果想释放空间需要自己手工清理。里面传入的参数ExternalizedCheckpointCleanup指定了当作业取消的时候外部的检查点该如何清理。
    DELETE_ON_CANCELLATION:在作业取消的时候会自动删除外部检查点,但是如果是作业失败退出,则会保留检查点。
    RETAIN_ON_CANCELLATION:作业取消的时候也会保留外部检查点。
  • 检查点连续失败次数(tolerableCheckpointFailureNumber)
    用于指定检查点连续失败的次数,当达到这个次数,作业就失败退出。默认为0,这意味着不能容忍检查点失败,并且作业将在第一次报告检查点失败时失败。
2)开启非对齐检查点
  • 非对齐检查点(enableUnalignedCheckpoints)
    不再执行检查点的分界线对齐操作,启用之后可以大大减少产生背压时的检查点保存时间。这个设置要求检查点模式(CheckpointingMode)必须为exctly-once,并且最大并发的检查点个数为1。
  • 对齐检查点超时时间(alignedCheckpointTimeout)
    该参数只有在启用非对齐检查点的时候有效。参数默认是0,表示一开始就直接用非对齐检查点。如果设置大于0,一开始会使用对齐的检查点,当对齐时间超过该参数设定的时间,则会自动切换成非对齐检查点。

代码中具体设置如下:

public class CheckpointConfigDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());env.setParallelism(1);// 代码中用到hdfs,需要导入hadoop依赖、指定访问hdfs的用户名System.setProperty("HADOOP_USER_NAME", "atguigu");// TODO 检查点配置// 1、启用检查点: 默认是barrier对齐的,周期为5s, 精准一次env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);CheckpointConfig checkpointConfig = env.getCheckpointConfig();// 2、指定检查点的存储位置checkpointConfig.setCheckpointStorage("hdfs://hadoop102:8020/chk");// 3、checkpoint的超时时间: 默认10分钟checkpointConfig.setCheckpointTimeout(60000);// 4、同时运行中的checkpoint的最大数量checkpointConfig.setMaxConcurrentCheckpoints(1);// 5、最小等待间隔: 上一轮checkpoint结束 到 下一轮checkpoint开始 之间的间隔,设置了>0,并发就会变成1checkpointConfig.setMinPauseBetweenCheckpoints(1000);// 6、取消作业时,checkpoint的数据 是否保留在外部系统// DELETE_ON_CANCELLATION:主动cancel时,删除存在外部系统的chk-xx目录 (如果是程序突然挂掉,不会删)// RETAIN_ON_CANCELLATION:主动cancel时,外部系统的chk-xx目录会保存下来checkpointConfig.setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);// 7、允许 checkpoint 连续失败的次数,默认0--》表示checkpoint一失败,job就挂掉checkpointConfig.setTolerableCheckpointFailureNumber(10);// TODO 开启 非对齐检查点(barrier非对齐)// 开启的要求: Checkpoint模式必须是精准一次,最大并发必须设为1checkpointConfig.enableUnalignedCheckpoints();// 开启非对齐检查点才生效: 默认0,表示一开始就直接用 非对齐的检查点// 如果大于0, 一开始用 对齐的检查点(barrier对齐), 对齐的时间超过这个参数,自动切换成 非对齐检查点(barrier非对齐)checkpointConfig.setAlignedCheckpointTimeout(Duration.ofSeconds(1));env.socketTextStream("hadoop102", 7777).flatMap((String value, Collector<Tuple2<String, Integer>> out) -> {String[] words = value.split(" ");for (String word : words) {out.collect(Tuple2.of(word, 1));}}).returns(Types.TUPLE(Types.STRING, Types.INT)).keyBy(value -> value.f0).sum(1).print();env.execute();}
}
9.1.4.4 通用增量 checkpoint (changelog)

在 1.15 之前,只有RocksDB 支持增量快照。不同于产生一个包含所有数据的全量备份,增量快照中只包含自上一次快照完成之后被修改的记录,因此可以显著减少快照完成的耗时。
Rocksdb状态后端启用增量checkpoint:

EmbeddedRocksDBStateBackend backend = new EmbeddedRocksDBStateBackend(true);

从 1.15 开始,不管hashmap还是rocksdb 状态后端都可以通过开启changelog实现通用的增量checkpoint。

1)执行过程

(1)带状态的算子任务将状态更改写入变更日志(记录状态)
在这里插入图片描述

(2)状态物化:状态表定期保存,独立于检查点
在这里插入图片描述

(3)状态物化完成后,状态变更日志就可以被截断到相应的点
在这里插入图片描述

2)注意事项

(1)目前标记为实验性功能,开启后可能会造成资源消耗增大:

  • HDFS上保存的文件数变多
  • 消耗更多的IO带宽用于上传变更日志
  • 更多的CPU用于序列化状态更改
  • TaskManager使用更多内存来缓存状态更改

(2)使用限制:

  • Checkpoint的最大并发必须为1
  • 从 Flink 1.15 开始,只有文件系统的存储类型实现可用(memory测试阶段)
  • 不支持 NO_CLAIM 模式
3)使用方式

(1)方式一:配置文件指定

state.backend.changelog.enabled: true
state.backend.changelog.storage: filesystem 
# 存储 changelog 数据
dstl.dfs.base-path: hdfs://hadoop102:8020/changelog 
execution.checkpointing.max-concurrent-checkpoints: 1
execution.savepoint-restore-mode: CLAIM

(2)方式二:在代码中设置
需要引入依赖:

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-statebackend-changelog</artifactId><version>${flink.version}</version><scope>runtime</scope>
</dependency>

开启changelog:

env.enableChangelogStateBackend(true);
9.1.4.5 最终检查点

如果数据源是有界的,就可能出现部分Task已经处理完所有数据,变成finished状态,不继续工作。从 Flink 1.14 开始,这些finished状态的Task,也可以继续执行检查点。自 1.15 起默认启用此功能,并且可以通过功能标志禁用它:

Configuration config = new Configuration();
config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, false);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);

9.1.5 保存点(Savepoint)

除了检查点外,Flink还提供了另一个非常独特的镜像保存功能——保存点(savepoint)。
从名称就可以看出,这也是一个存盘的备份,它的原理和算法与检查点完全相同,只是多了一些额外的元数据。

9.1.5.1 保存点的用途

保存点与检查点最大的区别,就是触发的时机。检查点是由Flink自动管理的,定期创建,发生故障之后自动读取进行恢复,这是一个“自动存盘”的功能;而保存点不会自动创建,必须由用户明确地手动触发保存操作,所以就是“手动存盘”。
保存点可以当作一个强大的运维工具来使用。我们可以在需要的时候创建一个保存点,然后停止应用,做一些处理调整之后再从保存点重启。它适用的具体场景有:

  • 版本管理和归档存储
  • 更新Flink版本
  • 更新应用程序
  • 调整并行度
  • 暂停应用程序

需要注意的是,保存点能够在程序更改的时候依然兼容,前提是状态的拓扑结构和数据类型不变。我们知道保存点中状态都是以算子ID-状态名称这样的key-value组织起来的,算子ID可以在代码中直接调用SingleOutputStreamOperator的.uid()方法来进行指定:

DataStream<String> stream = env.addSource(new StatefulSource()).uid("source-id").map(new StatefulMapper()).uid("mapper-id").print();

对于没有设置ID的算子,Flink默认会自动进行设置,所以在重新启动应用后可能会导致ID不同而无法兼容以前的状态。所以为了方便后续的维护,强烈建议在程序中为每一个算子手动指定ID。

9.1.5.2 使用保存点

保存点的使用非常简单,我们可以使用命令行工具来创建保存点,也可以从保存点恢复作业。

(1)创建保存点

要在命令行中为运行的作业创建一个保存点镜像,只需要执行:

bin/flink savepoint :jobId [:targetDirectory]

这里jobId需要填充要做镜像保存的作业ID,目标路径targetDirectory可选,表示保存点存储的路径。
对于保存点的默认路径,可以通过配置文件flink-conf.yaml中的state.savepoints.dir项来设定:

state.savepoints.dir: hdfs:///flink/savepoints

当然对于单独的作业,我们也可以在程序代码中通过执行环境来设置:

env.setDefaultSavepointDir("hdfs:///flink/savepoints");

由于创建保存点一般都是希望更改环境之后重启,所以创建之后往往紧接着就是停掉作业的操作。除了对运行的作业创建保存点,我们也可以在停掉一个作业时直接创建保存点:

bin/flink stop --savepointPath [:targetDirectory] :jobId
(2)从保存点重启应用

我们已经知道,提交启动一个Flink作业,使用的命令是flink run;现在要从保存点重启一个应用,其实本质是一样的:

bin/flink run -s :savepointPath [:runArgs]

这里只要增加一个-s参数,指定保存点的路径就可以了,其它启动时的参数还是完全一样的,如果是基于yarn的运行模式还需要加上 -yid application-id。我们在第三章使用web UI进行作业提交时,可以填入的参数除了入口类、并行度和运行参数,还有一个“Savepoint Path”,这就是从保存点启动应用的配置。

9.1.5.3 使用保存点切换状态后端

使用savepoint恢复状态的时候,也可以更换状态后端。但是有一点需要注意的是,不要在代码中指定状态后端了, 通过配置文件来配置或者-D 参数配置。
打包时,服务器上有的就provided,可能遇到依赖问题,报错:javax.annotation.Nullable找不到,可以导入如下依赖:

   <dependency><groupId>com.google.code.findbugs</groupId><artifactId>jsr305</artifactId><version>1.3.9</version></dependency>

(1)提交flink作业

bin/flink run-application -d -t yarn-application -Dstate.backend=hashmap -c com.atguigu.checkpoint.SavepointDemo FlinkTutorial-1.0-SNAPSHOT.jar

(2)停止flink作业时,触发保存点
方式一:stop优雅停止并触发保存点,要求source实现StoppableFunction接口

bin/flink stop -p savepoint路径 job-id -yid application-id

方式二:cancel立即停止并触发保存点

bin/flink cancel -s savepoint路径 job-id -yid application-id

案例中source是socket,不能用stop

bin/flink cancel -s hdfs://hadoop102:8020/sp cffca338509ea04f38f03b4b77c8075c -yid application_1681871196375_0001

(3)从savepoint恢复作业,同时修改状态后端

bin/flink run-application -d -t yarn-application -s hdfs://hadoop102:8020/sp/savepoint-267cc0-47a214b019d5 -Dstate.backend=rocksdb -c com.atguigu.checkpoint.SavepointDemo FlinkTutorial-1.0-SNAPSHOT.jar    

(4)从保存下来的checkpoint恢复作业

bin/flink run-application -d -t yarn-application -Dstate.backend=rocksdb -s hdfs://hadoop102:8020/chk/532f87ef4146b2a2968a1c137d33d4a6/chk-175 -c com.atguigu.checkpoint.SavepointDemo ./FlinkTutorial-1.0-SNAPSHOT.jar

如果停止作业时,忘了触发保存点也不用担心,现在版本的flink支持从保留在外部系统的checkpoint恢复作业,但是恢复时不支持切换状态后端。

9.2 状态一致性

9.2.1 一致性的概念和级别

一致性其实就是结果的正确性,一般从数据丢失、数据重复来评估。
流式计算本身就是一个一个来的,所以正常处理的过程中结果肯定是正确的;但在发生故障、需要恢复状态进行回滚时就需要更多的保障机制了。我们通过检查点的保存来保证状态恢复后结果的正确,所以主要讨论的就是“状态的一致性”。
一般说来,状态一致性有三种级别:

  • 最多一次(At-Most-Once)
  • 至少一次(At-Least-Once)
  • 精确一次(Exactly-Once)

9.2.2 端到端的状态一致性

我们已经知道检查点可以保证Flink内部状态的一致性,而且可以做到精确一次。那是不是说,只要开启了检查点,发生故障进行恢复,结果就不会有任何问题呢?
没那么简单。在实际应用中,一般要保证从用户的角度看来,最终消费的数据是正确的。而用户或者外部应用不会直接从Flink内部的状态读取数据,往往需要我们将处理结果写入外部存储中。这就要求我们不仅要考虑Flink内部数据的处理转换,还涉及到从外部数据源读取,以及写入外部持久化系统,整个应用处理流程从头到尾都应该是正确的。
所以完整的流处理应用,应该包括了数据源、流处理器和外部存储系统三个部分。这个完整应用的一致性,就叫做“端到端(end-to-end)的状态一致性”,它取决于三个组件中最弱的那一环。一般来说,能否达到at-least-once一致性级别,主要看数据源能够重放数据;而能否达到exactly-once级别,流处理器内部、数据源、外部存储都要有相应的保证机制。

9.3 端到端精确一次(End-To-End Exactly-Once)

实际应用中,最难做到、也最希望做到的一致性语义,无疑就是端到端(end-to-end)的“精确一次”。我们知道,对于Flink内部来说,检查点机制可以保证故障恢复后数据不丢(在能够重放的前提下),并且只处理一次,所以已经可以做到exactly-once的一致性语义了。
所以端到端一致性的关键点,就在于输入的数据源端和输出的外部存储端。
在这里插入图片描述

9.3.1 输入端保证

输入端主要指的就是Flink读取的外部数据源。对于一些数据源来说,并不提供数据的缓冲或是持久化保存,数据被消费之后就彻底不存在了,例如socket文本流。对于这样的数据源,故障后我们即使通过检查点恢复之前的状态,可保存检查点之后到发生故障期间的数据已经不能重发了,这就会导致数据丢失。所以就只能保证at-most-once的一致性语义,相当于没有保证。
想要在故障恢复后不丢数据,外部数据源就必须拥有重放数据的能力。常见的做法就是对数据进行持久化保存,并且可以重设数据的读取位置。一个最经典的应用就是Kafka。在Flink的Source任务中将数据读取的偏移量保存为状态,这样就可以在故障恢复时从检查点中读取出来,对数据源重置偏移量,重新获取数据。
数据源可重放数据,或者说可重置读取数据偏移量,加上Flink的Source算子将偏移量作为状态保存进检查点,就可以保证数据不丢。这是达到at-least-once一致性语义的基本要求,当然也是实现端到端exactly-once的基本要求。

9.3.2 输出端保证

有了Flink的检查点机制,以及可重放数据的外部数据源,我们已经能做到at-least-once了。但是想要实现exactly-once却有更大的困难:数据有可能重复写入外部系统。
因为检查点保存之后,继续到来的数据也会一一处理,任务的状态也会更新,最终通过Sink任务将计算结果输出到外部系统;只是状态改变还没有存到下一个检查点中。这时如果出现故障,这些数据都会重新来一遍,就计算了两次。我们知道对Flink内部状态来说,重复计算的动作是没有影响的,因为状态已经回滚,最终改变只会发生一次;但对于外部系统来说,已经写入的结果就是泼出去的水,已经无法收回了,再次执行写入就会把同一个数据写入两次。
所以这时,我们只保证了端到端的at-least-once语义。
为了实现端到端exactly-once,我们还需要对外部存储系统、以及Sink连接器有额外的要求。能够保证exactly-once一致性的写入方式有两种:

  • 幂等写入
  • 事务写入
    我们需要外部存储系统对这两种写入方式的支持,而Flink也为提供了一些Sink连接器接口。接下来我们进行展开讲解。
1)幂等(Idempotent)写入

所谓“幂等”操作,就是说一个操作可以重复执行很多次,但只导致一次结果更改。也就是说,后面再重复执行就不会对结果起作用了。
这相当于说,我们并没有真正解决数据重复计算、写入的问题;而是说,重复写入也没关系,结果不会改变。所以这种方式主要的限制在于外部存储系统必须支持这样的幂等写入:比如Redis中键值存储,或者关系型数据库(如MySQL)中满足查询条件的更新操作。
需要注意,对于幂等写入,遇到故障进行恢复时,有可能会出现短暂的不一致。因为保存点完成之后到发生故障之间的数据,其实已经写入了一遍,回滚的时候并不能消除它们。如果有一个外部应用读取写入的数据,可能会看到奇怪的现象:短时间内,结果会突然“跳回”到之前的某个值,然后“重播”一段之前的数据。不过当数据的重放逐渐超过发生故障的点的时候,最终的结果还是一致的。

2)事务(Transactional)写入

如果说幂等写入对应用场景限制太多,那么事务写入可以说是更一般化的保证一致性的方式。
输出端最大的问题,就是写入到外部系统的数据难以撤回。而利用事务就可以实现对已写入数据的撤回。
事务是应用程序中一系列严密的操作,所有操作必须成功完成,否则在每个操作中所作的所有更改都会被撤消。事务有四个基本特性:原子性、一致性、隔离性和持久性,这就是著名的ACID。
在Flink流处理的结果写入外部系统时,如果能够构建一个事务,让写入操作可以随着检查点来提交和回滚,那么自然就可以解决重复写入的问题了。所以事务写入的基本思想就是:用一个事务来进行数据向外部系统的写入,这个事务是与检查点绑定在一起的。当Sink任务遇到barrier时,开始保存状态的同时就开启一个事务,接下来所有数据的写入都在这个事务中;待到当前检查点保存完毕时,将事务提交,所有写入的数据就真正可用了。如果中间过程出现故障,状态会回退到上一个检查点,而当前事务没有正常关闭(因为当前检查点没有保存完),所以也会回滚,写入到外部的数据就被撤销了。
具体来说,又有两种实现方式:预写日志(WAL)和两阶段提交(2PC)

(1)预写日志(write-ahead-log,WAL)

我们发现,事务提交是需要外部存储系统支持事务的,否则没有办法真正实现写入的回撤。那对于一般不支持事务的存储系统,能够实现事务写入呢?
预写日志(WAL)就是一种非常简单的方式。具体步骤是:
①先把结果数据作为日志(log)状态保存起来
②进行检查点保存时,也会将这些结果数据一并做持久化存储
③在收到检查点完成的通知时,将所有结果一次性写入外部系统。
④在成功写入所有数据后,在内部再次确认相应的检查点,将确认信息也进行持久化保存。这才代表着检查点的真正完成。
我们会发现,这种方式类似于检查点完成时做一个批处理,一次性的写入会带来一些性能上的问题;而优点就是比较简单,由于数据提前在状态后端中做了缓存,所以无论什么外部存储系统,理论上都能用这种方式一批搞定。在Flink中DataStream API提供了一个模板类GenericWriteAheadSink,用来实现这种事务型的写入方式。
需要注意的是,预写日志这种一批写入的方式,有可能会写入失败;所以在执行写入动作之后,必须等待发送成功的返回确认消息。在成功写入所有数据后,在内部再次确认相应的检查点,这才代表着检查点的真正完成。这里需要将确认信息也进行持久化保存,在故障恢复时,只有存在对应的确认信息,才能保证这批数据已经写入,可以恢复到对应的检查点位置。
但这种“再次确认”的方式,也会有一些缺陷。如果我们的检查点已经成功保存、数据也成功地一批写入到了外部系统,但是最终保存确认信息时出现了故障,Flink最终还是会认为没有成功写入。于是发生故障时,不会使用这个检查点,而是需要回退到上一个;这样就会导致这批数据的重复写入。

(2)两阶段提交(two-phase-commit,2PC)

前面提到的各种实现exactly-once的方式,多少都有点缺陷;而更好的方法就是传说中的两阶段提交(2PC)。
顾名思义,它的想法是分成两个阶段:先做“预提交”,等检查点完成之后再正式提交。这种提交方式是真正基于事务的,它需要外部系统提供事务支持。
具体的实现步骤为:
①当第一条数据到来时,或者收到检查点的分界线时,Sink任务都会启动一个事务。
②接下来接收到的所有数据,都通过这个事务写入外部系统;这时由于事务没有提交,所以数据尽管写入了外部系统,但是不可用,是“预提交”的状态。
③当Sink任务收到JobManager发来检查点完成的通知时,正式提交事务,写入的结果就真正可用了。
当中间发生故障时,当前未提交的事务就会回滚,于是所有写入外部系统的数据也就实现了撤回。这种两阶段提交(2PC)的方式充分利用了Flink现有的检查点机制:分界线的到来,就标志着开始一个新事务;而收到来自JobManager的checkpoint成功的消息,就是提交事务的指令。每个结果数据的写入,依然是流式的,不再有预写日志时批处理的性能问题;最终提交时,也只需要额外发送一个确认信息。所以2PC协议不仅真正意义上实现了exactly-once,而且通过搭载Flink的检查点机制来实现事务,只给系统增加了很少的开销。
Flink提供了TwoPhaseCommitSinkFunction接口,方便我们自定义实现两阶段提交的SinkFunction的实现,提供了真正端到端的exactly-once保证。新的Sink架构,使用的是TwoPhaseCommittingSink接口。
不过两阶段提交虽然精巧,却对外部系统有很高的要求。这里将2PC对外部系统的要求列举如下:

  • 外部系统必须提供事务支持,或者Sink任务必须能够模拟外部系统上的事务。
  • 在检查点的间隔期间里,必须能够开启一个事务并接受数据写入。
  • 在收到检查点完成的通知之前,事务必须是“等待提交”的状态。在故障恢复的情况下,这可能需要一些时间。如果这个时候外部系统关闭事务(例如超时了),那么未提交的数据就会丢失。
  • Sink任务必须能够在进程失败后恢复事务。
  • 提交事务必须是幂等操作。也就是说,事务的重复提交应该是无效的。

可见,2PC在实际应用同样会受到比较大的限制。具体在项目中的选型,最终还应该是一致性级别和处理性能的权衡考量。

9.3.3 Flink和Kafka连接时的精确一次保证

在流处理的应用中,最佳的数据源当然就是可重置偏移量的消息队列了;它不仅可以提供数据重放的功能,而且天生就是以流的方式存储和处理数据的。所以作为大数据工具中消息队列的代表,Kafka可以说与Flink是天作之合,实际项目中也经常会看到以Kafka作为数据源和写入的外部系统的应用。在本小节中,我们就来具体讨论一下Flink和Kafka连接时,怎样保证端到端的exactly-once状态一致性。
在这里插入图片描述

1)整体介绍

既然是端到端的exactly-once,我们依然可以从三个组件的角度来进行分析:

(1)Flink内部

Flink内部可以通过检查点机制保证状态和处理结果的exactly-once语义。

(2)输入端

输入数据源端的Kafka可以对数据进行持久化保存,并可以重置偏移量(offset)。所以我们可以在Source任务(FlinkKafkaConsumer)中将当前读取的偏移量保存为算子状态,写入到检查点中;当发生故障时,从检查点中读取恢复状态,并由连接器FlinkKafkaConsumer向Kafka重新提交偏移量,就可以重新消费数据、保证结果的一致性了。

(3)输出端

输出端保证exactly-once的最佳实现,当然就是两阶段提交(2PC)。作为与Flink天生一对的Kafka,自然需要用最强有力的一致性保证来证明自己。
也就是说,我们写入Kafka的过程实际上是一个两段式的提交:处理完毕得到结果,写入Kafka时是基于事务的“预提交”;等到检查点保存完毕,才会提交事务进行“正式提交”。如果中间出现故障,事务进行回滚,预提交就会被放弃;恢复状态之后,也只能恢复所有已经确认提交的操作。

2)需要的配置

在具体应用中,实现真正的端到端exactly-once,还需要有一些额外的配置:
(1)必须启用检查点
(2)指定KafkaSink的发送级别为DeliveryGuarantee.EXACTLY_ONCE
(3)配置Kafka读取数据的消费者的隔离级别
这里所说的Kafka,是写入的外部系统。预提交阶段数据已经写入,只是被标记为“未提交”(uncommitted),而Kafka中默认的隔离级别isolation.level是read_uncommitted,也就是可以读取未提交的数据。这样一来,外部应用就可以直接消费未提交的数据,对于事务性的保证就失效了。所以应该将隔离级别配置
为read_committed,表示消费者遇到未提交的消息时,会停止从分区中消费数据,直到消息被标记为已提交才会再次恢复消费。当然,这样做的话,外部应用消费数据就会有显著的延迟。
(4)事务超时配置
Flink的Kafka连接器中配置的事务超时时间transaction.timeout.ms默认是1小时,而Kafka集群配置的事务最大超时时间transaction.max.timeout.ms默认是15分钟。所以在检查点保存时间很长时,有可能出现Kafka已经认为事务超时了,丢弃了预提交的数据;而Sink任务认为还可以继续等待。如果接下来检查点保存成功,发生故障后回滚到这个检查点的状态,这部分数据就被真正丢掉了。所以这两个超时时间,前者应该小于等于后者。

public class KafkaEOSDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 代码中用到hdfs,需要导入hadoop依赖、指定访问hdfs的用户名System.setProperty("HADOOP_USER_NAME", "atguigu");// TODO 1、启用检查点,设置为精准一次env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);CheckpointConfig checkpointConfig = env.getCheckpointConfig();checkpointConfig.setCheckpointStorage("hdfs://hadoop102:8020/chk");checkpointConfig.setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);// TODO 2.读取kafkaKafkaSource<String> kafkaSource = KafkaSource.<String>builder().setBootstrapServers("hadoop102:9092,hadoop103:9092,hadoop104:9092").setGroupId("atguigu").setTopics("topic_1").setValueOnlyDeserializer(new SimpleStringSchema()).setStartingOffsets(OffsetsInitializer.latest()).build();DataStreamSource<String> kafkasource = env.fromSource(kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)), "kafkasource");/*** TODO 3.写出到Kafka* 精准一次 写入Kafka,需要满足以下条件,缺一不可* 1、开启checkpoint* 2、sink设置保证级别为 精准一次* 3、sink设置事务前缀* 4、sink设置事务超时时间: checkpoint间隔 <  事务超时时间  < max的15分钟*/KafkaSink<String> kafkaSink = KafkaSink.<String>builder()// 指定 kafka 的地址和端口.setBootstrapServers("hadoop102:9092,hadoop103:9092,hadoop104:9092")// 指定序列化器:指定Topic名称、具体的序列化.setRecordSerializer(KafkaRecordSerializationSchema.<String>builder().setTopic("ws").setValueSerializationSchema(new SimpleStringSchema()).build())// TODO 3.1 精准一次,开启 2pc.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)// TODO 3.2 精准一次,必须设置 事务的前缀.setTransactionalIdPrefix("atguigu-")// TODO 3.3 精准一次,必须设置 事务超时时间: 大于checkpoint间隔,小于 max 15分钟.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10*60*1000+"").build();kafkasource.sinkTo(kafkaSink);env.execute();}
}

后续读取“ws”这个topic的消费者,要设置事务的隔离级别为“读已提交”,如下:

public class KafkaEOSDemo2 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 消费 在前面使用两阶段提交写入的TopicKafkaSource<String> kafkaSource = KafkaSource.<String>builder().setBootstrapServers("hadoop102:9092,hadoop103:9092,hadoop104:9092").setGroupId("atguigu").setTopics("ws").setValueOnlyDeserializer(new SimpleStringSchema()).setStartingOffsets(OffsetsInitializer.latest())// TODO 作为 下游的消费者,要设置 事务的隔离级别 = 读已提交.setProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed").build();env.fromSource(kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)), "kafkasource").print();env.execute();}
}

相关文章:

【Flink系列】9. Flink容错机制

9. 容错机制 在Flink中&#xff0c;有一套完整的容错机制来保证故障后的恢复&#xff0c;其中最重要的就是检查点。 9.1 检查点&#xff08;Checkpoint&#xff09; 9.1.1 检查点的保存 1&#xff09;周期性的触发保存 “随时存档”确实恢复起来方便&#xff0c;可是需要我…...

【物联网】ARM核介绍

文章目录 一、芯片产业链1. CPU核(1)ARM(2)MIPS(3)PowerPc(4)Intel(5)RISC-V 2. SOC芯片(1)主流厂家(2)产品解决方案 3. 产品 二、ARM核发展1. 不同架构的特点分析(1)VFP(2)Jazelle(3)Thumb(4)TrustZone(5)SIMD(6)NEON 三、ARM核(ARMv7)工作模式1. 权限级别(privilege level)2.…...

spring的事物管理的认知

事物 它是一个原子操作要么全部不执行&#xff0c;要么全部执行成功&#xff0c;如果有一个失败也会撤销&#xff0c;它保证用户每一次的操作都是可靠的&#xff0c;即使时出现了错误也不至于破坏数据的完整性 它包含了四种特性&#xff1a; 原子性&#xff1a;保证事物要么…...

QT跨平台应用程序开发框架(3)—— 信号和槽

目录 一&#xff0c;基本概念 二&#xff0c;connect函数使用 2.1 connect 2.2 Qt内置信号和槽 2.3 一些细节 三&#xff0c;自定义信号和槽 3.1 自定义槽函数 3.2 自定义信号 3.3 带参数的信号槽 四&#xff0c;信号和槽的意义 五&#xff0c;信号和槽断开连接 六&…...

技术面试中的软素质技巧性答复集锦

1、请你自我介绍一下你自己&#xff1f; 回答提示&#xff1a;一般人回答这个问题过于平常&#xff0c;只说姓名、年龄、爱好、工作经验&#xff0c;这些在简历上都有。其实&#xff0c;企业最希望知道的是求职者能否胜任工作&#xff0c;包括&#xff1a;最强的技能、最深入研…...

JavaWeb项目——如何处理管理员登录和退出——笔记

一、知识点 1、WebServlet注解的使用 WebServlet注解是Servlet 3.0引入的一个特性&#xff0c;它允许开发者在Servlet类上使用注解来声明Servlet的一些属性&#xff0c;从而避免在web.xml文件中进行配置。这种方式简化了Servlet的配置过程&#xff0c;使得代码更加简洁&#…...

函数递归的介绍

1.递归的定义 在C语言中&#xff0c;递归就是函数自己调用自己 上面的代码就是 main 函数在函数主体内 自己调用自己 但是&#xff0c;上面的代码存在问题&#xff1a;main 函数反复地 自己调用自己 &#xff0c;不受限制&#xff0c;停不下来。 最终形成死递归&#xff0c;…...

昇腾环境ppstreuct部署问题记录

测试代码 我是在华为昇腾910B3上测试的PPStructure。 import os import cv2 from PIL import Image #from paddleocr import PPStructure,draw_structure_result,save_structure_res from paddleocr_asyncio import PPStructuretable_engine PPStructure(show_logTrue, imag…...

《知识图谱:鸿蒙NEXT中人工智能的智慧基石》

在鸿蒙NEXT系统的人工智能应用中&#xff0c;知识图谱技术犹如一座智慧基石&#xff0c;为系统的智能化提供了强大的知识支撑&#xff0c;开启了更智能、更高效、更个性化的交互新时代。 提升语义理解能力 知识图谱以其结构化的知识表示方式&#xff0c;将各种实体和它们之间…...

Springboot项目Jackson支持多种接收多种时间格式

前言 在springboot项目中经常会使用Jackson框架,当前端给后端传输时间类型时,我们一般需要先配置好时间格式,否则后端无法接收。以下是一些配置方法 统一配置 spring:jackson:time-zone: GMT+8date-format: yyyy-MM-dd HH:mm:ss这种配置就是要求前端统一传输的格式是yyyy-…...

go语言zero框架通过chromedp实现网页在线截图的设计与功能实现

在 GoZero 框架中实现网页在线截图的功能&#xff0c;可以通过集成 chromedp 库来控制 Chrome 浏览器进行截图。chromedp 是一个基于 Chrome DevTools 协议的 Go 包&#xff0c;可以用来在 Go 程序中模拟浏览器操作&#xff0c;如页面截图、DOM 操作、表单提交等。 下面是一个…...

基于深度学习的视觉检测小项目(十四) 用SQLite数据库进行用户管理

在开始做用户管理之前&#xff0c;先要了解一下SQLite数据库的基础知识&#xff1a;https://blog.csdn.net/xulibo5828/category_12785993.html?fromshareblogcolumn&sharetypeblogcolumn&sharerId12785993&sharereferPC&sharesourcexulibo5828&sharefrom…...

【2024年华为OD机试】 (B卷,100分)- 敏感字段加密(Java JS PythonC/C++)

一、问题描述 题目描述 给定一个由多个命令字组成的命令字符串: 字符串长度小于等于 127 字节,只包含大小写字母、数字、下划线和偶数个双引号;命令字之间以一个或多个下划线 _ 进行分割;可以通过两个双引号 "" 来标识包含下划线 _ 的命令字或空命令字(仅包含…...

图像去雾数据集的下载和预处理操作

前言 目前&#xff0c;因为要做对比实验&#xff0c;收集了一下去雾数据集&#xff0c;并且建立了一个数据集的预处理工程。 这是以前我写的一个小仓库&#xff0c;我决定还是把它用起来&#xff0c;下面将展示下载的路径和数据处理的方法。 下面的代码均可以在此找到。Auo…...

Vue3数据响应式原理

什么是数据响应式 当数据变化时&#xff0c;引用数据的函数&#xff08;副作用函数&#xff09;自动重新执行。 即数据触发了函数的响应&#xff0c;如&#xff1a;视图渲染中使用了某数据&#xff0c;数据改变后&#xff0c;视图跟着自动更新。 触发者&#xff1a;数据 响应者…...

5.最长回文子串--力扣

给你一个字符串 s&#xff0c;找到 s 中最长的 回文子串。 示例 1&#xff1a; 输入&#xff1a;s “babad” 输出&#xff1a;“bab” 解释&#xff1a;“aba” 同样是符合题意的答案。 示例 2&#xff1a; 输入&#xff1a;s “cbbd” 输出&#xff1a;“bb” 原题如上&…...

ChatGPT大模型极简应用开发-CH1-初识 GPT-4 和 ChatGPT

文章目录 1.1 LLM 概述1.1.1 语言模型和NLP基础1.1.2 Transformer及在LLM中的作用1.1.3 解密 GPT 模型的标记化和预测步骤 1.2 GPT 模型简史&#xff1a;从 GPT-1 到 GPT-41.2.1 GPT11.2.2 GPT21.2.3 GPT-31.2.4 从 GPT-3 到 InstructGPT1.2.5 GPT-3.5、Codex 和 ChatGPT1.2.6 …...

python学opencv|读取图像(三十九 )阈值处理Otsu方法

【1】引言 前序学习了5种阈值处理方法&#xff0c;包括(反)阈值处理、(反)零值处理和截断处理&#xff0c;还学习了一种自适应处理方法&#xff0c;相关文章链接为&#xff1a; python学opencv|读取图像&#xff08;三十三&#xff09;阈值处理-灰度图像-CSDN博客 python学o…...

统信V20 1070e X86系统编译安装mysql-5.7.44版本以及主从构建

设备信息 操作系统版本架构CPU内存备注统信UOS V20 1070eX864C8G此配置仅做编译安装验证&#xff0c;持续运行或数据量增长大请自行评估资源配置。统信UOS V20 1070eX864C8G 资源包 该包包含mysql-5.7.44源码包、boost资源包、统信编译mysql-5.7.44安装包 通过网盘分享的文件…...

麒麟LINUX V10SP3 2401安装ORACLE 12.2.1 runInstaller直接报UNZIP格式不对

好久没有安装ORACLE了&#xff0c;一般都是RHEL上安装得比较多&#xff0c;这不&#xff0c;现在大家都是选择国产操作系统来安装数据库了&#xff0c;以前在龙蜥&#xff0c;欧拉&#xff0c;麒麟上也安装过&#xff0c;都没有问题&#xff0c;想来在麒麟LINUX v10sp3 2401上面…...

10 为什么系统需要引入分布式、微服务架构

java技术的发展 在java开始流行起来之后&#xff0c;主要服务于企业家应用&#xff0c;例如ERP,CRM等等&#xff0c;这些项目是为企业内部员工使用&#xff0c;我们的思维是怎么用设计模式&#xff0c;如何封装代码。让开发人员关注到业务上去&#xff0c;系统也就那么几十几百…...

【Web】2025西湖论剑·中国杭州网络安全安全技能大赛题解(全)

目录 Rank-l Rank-U sqli or not Rank-l username存在报错回显&#xff0c;发现可以打SSTI 本地起一个服务&#xff0c;折半查找fuzz黑名单&#xff0c;不断扔给fenjing去迭代改payload from flask import Flask, request, render_template_stringapp Flask(__name__)app…...

openharmony应用开发快速入门

开发准备 本文档适用于OpenHarmony应用开发的初学者。通过构建一个简单的具有页面跳转/返回功能的应用&#xff08;如下图所示&#xff09;&#xff0c;快速了解工程目录的主要文件&#xff0c;熟悉OpenHarmony应用开发流程。 在开始之前&#xff0c;您需要了解有关OpenHarmon…...

解决npm install安装出现packages are looking for funding run `npm fund` for details问题

当我们运行npm install时&#xff0c;可能会收到类似以下的提示信息&#xff1a;“x packages are looking for funding.” 这并不是错误提示&#xff0c;也不会影响项目的正常运行。其实实在提醒有一些软件包正在寻求资金支持。 根据提示输入npm fund可以查看详细的信息&#…...

python助力WRF自动化运行

对大部分人而言&#xff0c;特别是新用户&#xff0c;WRF模式的安装繁琐且不必要&#xff0c;可以作为后续进阶掌握的技能&#xff0c;本学习跳过繁琐的安装步骤&#xff0c;直接聚焦模式的运行部分&#xff0c;通过短平快的教学&#xff0c;快速掌握模式运行。进一步将python语…...

Go-知识 版本演进

Go-知识 版本演进 Go release notesr56(2011/03/16)r57(2011/05/03)Gofix 工具语言包工具小修订 r58(2011/06/29)语言包工具小修订 r59(2011/08/01)语言包工具 r60(2011/09/07)语言包工具 [go1 2012-03-28](https://golang.google.cn/doc/devel/release#go1)[go1.1 2013-05-13]…...

企业级NoSQL数据库Redis

1.浏览器缓存过期机制 1.1 最后修改时间 last-modified 浏览器缓存机制是优化网页加载速度和减少服务器负载的重要手段。以下是关于浏览器缓存过期机制、Last-Modified 和 ETag 的详细讲解&#xff1a; 一、Last-Modified 头部 定义&#xff1a;Last-Modified 表示服务器上资源…...

Android渲染Latex公式的开源框架比较

对比主流框架&#xff0c;介绍如下几款 1、AndroidMath 官网&#xff1a;https://github.com/gregcockroft/AndroidMath/tree/master 基于android原生view方式渲染 优点&#xff1a;速度快&#xff0c;开源协议 MIT license 缺点&#xff1a;不支持文字公式混合渲染 2、Ma…...

ARM学习(42)CortexM3/M4 MPU配置

笔者之前学习过CortexR5的MPU配置,现在学习一下CortexM3/M4 MPU配置 1、背景介绍 笔者在工作中遇到NXP MPU在访问异常地址时,就会出现总线挂死,所以需要MPU抓住异常,就需要配置MPU。具体背景情况可以参考ARM学习(41)NXP MCU总线挂死,CPU could not be halted以及无法连…...

Sam Altman亲自确认:o3-mini即将上线!GPT和o系列模型合并!

大家好&#xff0c;我是木易&#xff0c;一个持续关注AI领域的互联网技术产品经理&#xff0c;国内Top2本科&#xff0c;美国Top10 CS研究生&#xff0c;MBA。我坚信AI是普通人变强的“外挂”&#xff0c;专注于分享AI全维度知识&#xff0c;包括但不限于AI科普&#xff0c;AI工…...

数据结构-队列

目录 前言一、队列及其抽象数据类型1.1 队列的基本概念1.2 队列的抽象数据类型 二、队列的实现2.1 顺序表示2.1.1 结构定义2.1.2 基本操作的实现 2.2 链式表示2.2.1 结构定义2.2.2 基本操作的实现 总结 前言 本篇文章介绍队列的基础知识&#xff0c;包括队列的抽象数据类型以及…...

Go Map 源码分析(一)

Go语言中的map是通过哈希表实现的&#xff0c;其底层结构和实现机制如下&#xff1a; 一、hash 结构 hmap结构体&#xff1a;是map的头部结构&#xff0c;主要字段及含义如下&#xff1a; count&#xff1a;表示当前哈希表中的元素数量&#xff0c;与len()函数相对应。flags…...

天机学堂5-XxlJobRedis

文章目录 梳理前面的实现&#xff1a;Feign点赞改进 day07-积分系统bitmap相关命令签到增加签到记录计算本月已连续签到的天数查询签到记录 积分表设计签到-->发送RabbitMQ消息&#xff0c;保存积分对应的消费者&#xff1a;**消费消息 用于保存积分**增加积分查询个人今日积…...

SpringBoot整合junit

SpringBoot 整合 junit 特别简单&#xff0c;分为以下三步完成: 1在测试类上添加 SpringBootTest 注解2使用 Autowired 注入要测试的资源3定义测试方法进行测试 1.实验准备&#xff1a; 创建一个名为 springboot_junit_test 的 SpringBoot 工程&#xff0c;工程目录结构如下…...

Jenkins-pipeline Jenkinsfile说明

一. 简介&#xff1a; Jenkinsfile 是一个文本文件&#xff0c;通常保存在项目的源代码仓库中&#xff0c;用于定义 Jenkins Pipeline 的行为。使用 Jenkinsfile 可以使 CI/CD 流程版本化&#xff0c;并且易于共享和审核。 二. 关于jenkinsfile&#xff1a; jenkins的pipeline…...

SpringMVC 实战指南:打造高效 Web 应用的秘籍

第一章&#xff1a;三层架构和MVC 三层架构&#xff1a; 开发服务器端&#xff0c;一般基于两种形式&#xff0c;一种 C/S 架构程序&#xff0c;一种 B/S 架构程序使用 Java 语言基本上都是开发 B/S 架构的程序&#xff0c;B/S 架构又分成了三层架构三层架构&#xff1a; 表现…...

结合帧级边界检测和深度伪造检测,定位部分伪造音频攻击中的篡改区域

Integrating frame-level boundary detection and deepfake detection for locating manipulated regions in partially spoofed audio forgery 摘要&#xff1a; 部分伪造音频是一种深度伪造的变体&#xff0c;它通过引入伪造或外部来源的善意音频片段来操纵音频语句&#xf…...

人工智能之深度学习_[2]-PyTorch入门

文章目录 PyTorch1.PyTorch简介1.1 什么是PyTorch1.2 PyTorch特点1.3 PyTorch发展历史 2 张量创建2.1 什么是张量2.2 基本创建方式2.3 线性和随机张量2.4 0、1、指定值张量2.5 指定元素类型张量 3 张量类型转换3.1 张量转换为NumPy数组3.2 NumPy数组转换为张量3.3 提取标量张量…...

vue2与vue3的区别

目录 1. 性能 2. 组合式 API 3. 生命周期钩子 4. 片段&#xff08;Fragments&#xff09; 5. 递归组件 6. 自定义渲染器 7. 全局 API 8. 组件内部的 this 9. 模板语法 10. 兼容性 总结 Vue 2 和 Vue 3 是 Vue.js 框架的两个主要版本&#xff0c;它们在多个方面有所不…...

八股学习 Mysql

八股学习 Mysql 常见面试问题优化其他 定位慢查询方案一&#xff1a;开源工具方案二&#xff1a;MySQL自带慢日志 SQL执行计划示例场景名词解释 索引概念底层数据结构聚簇索引、二级索引&#xff08;非聚簇索引&#xff09;覆盖索引覆盖索引应用场景创建原则索引失效 SQL优化表…...

主从复制

简述mysql 主从复制原理及其工作过程&#xff0c;配置一主两从并验证。 主从原理&#xff1a;MySQL 主从同步是一种数据库复制技术&#xff0c;它通过将主服务器上的数据更改复制到一个或多个从服务器&#xff0c;实现数据的自动同步。 主从同步的核心原理是将主服务器上的二…...

服务器数据恢复—Zfs文件系统数据恢复案例

服务器数据恢复环境&故障&#xff1a; 一台zfs文件系统的服务器&#xff0c;管理员误操作删除了服务器上的数据。 服务器数据恢复过程&#xff1a; 1、将故障服务器中所有硬盘做好标记后取出&#xff0c;硬件工程师检测后没有发现有硬盘存在硬件故障。以只读方式将所有硬盘…...

Linux安装docker,安装配置xrdp远程桌面

Linux安装docker&#xff0c;安装配置xrdp远程桌面。 1、卸载旧版本docker 卸载旧版本docker命令 yum remove docker \docker-client \docker-client-latest \docker-common \docker-latest \docker-latest-logrotate \docker-logrotate \docker-engine现在就是没有旧版本的d…...

Windows11电脑总是一闪一闪的,黑一下亮一些怎么解决

Windows11电脑总是一闪一闪的&#xff0c;黑一下亮一些怎么解决 1. 打开设备管理器2. 点击显示适配器3. 更新下方两个选项的驱动3.1 更新驱动Inter(R) UHD Graphixs3.2 更新驱动NVIDIA GeForce RTX 4060 Laptop GPU 4. 其他文章快来试试吧&#x1f970; 1. 打开设备管理器 在电…...

Low-Level 大一统:如何使用Diffusion Models完成视频超分、去雨、去雾、降噪等所有Low-Level 任务?

Diffusion Models专栏文章汇总:入门与实战 前言:视频在传输过程中常常因为各种因素(如恶劣天气、噪声、压缩和传感器分辨率限制)而出现质量下降,这会严重影响计算机视觉任务(如目标检测和视频监控)的性能。现有的视频修复方法虽然取得了一些进展,但通常只能针对特定的退…...

使用 Blazor 和 Elsa Workflows 作为引擎的工作流系统开发

开发一个完整的工作流系统使用 Blazor 和 Elsa Workflows 作为引擎&#xff0c;可以实现一个功能强大的工作流管理和设计系统。下面将提供详细的步骤和代码实现&#xff0c;展示如何在 Blazor 中开发一个基于 Elsa Workflows 的工作流系统。 项目概述 我们的工作流系统将包含以…...

调试Hadoop源代码

个人博客地址&#xff1a;调试Hadoop源代码 | 一张假钞的真实世界 Hadoop版本 Hadoop 2.7.3 调试模式下启动Hadoop NameNode 在${HADOOP_HOME}/etc/hadoop/hadoop-env.sh中设置NameNode启动的JVM参数&#xff0c;如下&#xff1a; export HADOOP_NAMENODE_OPTS"-Xdeb…...

mkv转码mp4(ffmpeg工具)

基于windows&#xff0c;Linux也可以用&#xff0c;都是命令行 下载路径&#xff08;https://github.com/BtbN/FFmpeg-Builds/releases&#xff09; 下载安装包&#xff1a;ffmpeg-n6.1-latest-win64-lgpl-6.1.zip&#xff0c;&#xff08;根据自己的平台选择下载&#xff09;并…...

前端项目搭建和基础配置

这个模块主要是介绍从零开始搭建项目的一些操作&#xff0c;包含一些前端常用的配置&#xff0c;这里只是一部分&#xff0c;会在后续的文章中逐步进行补充和完善 一、创建项目 在项目路径下使用以下命令生成前后端项目 npm create vite输入项目名称&#xff0c;框架选择Vue…...

计算机网络 (49)网络安全问题概述

前言 计算机网络安全问题是一个复杂且多维的领域&#xff0c;它涉及到网络系统的硬件、软件以及数据的安全保护&#xff0c;确保这些元素不因偶然的或恶意的原因而遭到破坏、更改或泄露。 一、计算机网络安全的定义 计算机网络安全是指利用网络管理控制和技术措施&#xff0c;保…...