当前位置: 首页 > news >正文

Debezium日常分享系列之:Debezium Engine

Debezium日常分享系列之:Debezium Engine

  • 依赖
  • 打包项目
  • 在代码中
  • 输出消息格式
  • 消息转换
  • 消息转换谓词
  • 高级记录使用
  • 引擎属性
  • 异步引擎属性
  • 数据库模式历史属性
  • 处理故障

Debezium连接器通常通过部署到Kafka Connect服务来运行,并配置一个或多个连接器来监视上游数据库,并为上游数据库中的所有更改生成数据变更事件。这些数据变更事件被写入Kafka,可以由许多不同的应用程序独立消费。Kafka Connect提供了出色的容错性和可伸缩性,因为它作为分布式服务运行,并确保所有注册和配置的连接器始终运行。例如,即使集群中的一个Kafka Connect端点关闭,剩余的Kafka Connect端点也会重新启动之前在已终止端点上运行的任何连接器,从而最大程度地减少停机时间并消除管理活动。

并非每个应用程序都需要这种级别的容错性和可靠性,他们可能不想依赖外部的Kafka代理和Kafka Connect服务。相反,一些应用程序更愿意直接在应用程序空间中嵌入Debezium连接器。它们仍然需要相同的数据变更事件,但更希望连接器直接将其发送到应用程序而不是在Kafka中持久化。
这个debezium-api模块定义了一个小的API,允许应用程序使用Debezium Engine轻松配置和运行Debezium连接器。

从2.6.0版本开始,Debezium提供了两个DebeziumEngine接口的实现。较旧的EmbeddedEngine实现运行一个只使用一个任务的连接器。连接器按顺序发出所有记录。这是默认的实现。

从2.6.0版本开始,还提供了一个新的AsyncEmbeddedEngine实现。这个实现也只运行一个连接器,但它可以在多个线程中处理记录,并运行多个任务,如果连接器支持的话(目前只有SQL Server和MongoDB的连接器支持在一个连接器中运行多个任务)。由于这两个引擎实现了相同的接口并共享相同的API,下面的代码示例对于任何引擎都是有效的。这两个实现支持相同的配置选项。

然而,新的AsyncEmbeddedEngine提供了一些用于设置和优化并行处理的新配置选项。

依赖

要使用Debezium Engine模块,将debezium-api模块添加到应用程序的依赖项中。还应将debezium-embedded模块添加到依赖项中,这是该API的一个开箱即用的实现。对于Maven,这需要将以下内容添加到应用程序的POM文件中:

<dependency><groupId>io.debezium</groupId><artifactId>debezium-api</artifactId><version>${version.debezium}</version>
</dependency>
<dependency><groupId>io.debezium</groupId><artifactId>debezium-embedded</artifactId><version>${version.debezium}</version>
</dependency>

其中${version.debezium}可以是您使用的Debezium版本,也可以是一个包含Debezium版本字符串的Maven属性的值。

同样,为您的应用程序将使用的每个Debezium连接器添加依赖项。例如,可以将以下内容添加到您的应用程序的Maven POM文件中,以便您的应用程序可以使用MySQL连接器:

<dependency><groupId>io.debezium</groupId><artifactId>debezium-connector-mysql</artifactId><version>${version.debezium}</version>
</dependency>

或者对于 MongoDB 连接器:

<dependency><groupId>io.debezium</groupId><artifactId>debezium-connector-mongodb</artifactId><version>${version.debezium}</version>
</dependency>

本文档的其余部分介绍了如何在应用程序中嵌入 MySQL 连接器。其他连接器的使用方式类似,但连接器特定的配置、主题和事件除外。

打包项目

Debezium使用SPI通过ServiceLoader加载实现。实现可以基于连接器类型,也可以是自定义实现。

有些接口有多个实现。例如,io.debezium.snapshot.spi.SnapshotLock在核心中有一个默认实现,并且针对每个连接器有特定的实现。为了确保Debezium可以定位所需的实现,必须显式地配置构建工具以合并META-INF/services文件。

例如,如果使用的是Maven shade插件,请添加ServicesResourceTransformer转换器,如下例所示:

...
<configuration><transformers>...<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />...</transformers>
...
</configuration>

或者,如果您使用 Maven Assembly 插件,则可以使用 metaInf-services 容器描述符处理程序。

在代码中

您的应用程序需要为每个要运行的连接器实例设置一个嵌入式引擎。io.debezium.engine.DebeziumEngine<‍R‍>类作为一个易于使用的包装器,完全管理连接器的生命周期。您可以使用它的构建器API创建DebeziumEngine实例,提供以下内容:

  • 您希望以哪种格式接收消息,例如JSON、Avro或Kafka Connect SourceRecord(见链接)
  • 配置属性(可能从属性文件中加载),用于定义引擎和连接器的环境
  • 一个方法,该方法将被调用以处理连接器产生的每个数据变更事件

以下是一个配置和运行嵌入式引擎的示例代码:

// Define the configuration for the Debezium Engine with MySQL connector...
final Properties props = new Properties();
props.setProperty("name", "engine");
props.setProperty("connector.class", "io.debezium.connector.mysql.MySqlConnector");
props.setProperty("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore");
props.setProperty("offset.storage.file.filename", "/path/to/storage/offsets.dat");
props.setProperty("offset.flush.interval.ms", "60000");
/* begin connector properties */
props.setProperty("database.hostname", "localhost");
props.setProperty("database.port", "3306");
props.setProperty("database.user", "mysqluser");
props.setProperty("database.password", "mysqlpw");
props.setProperty("database.server.id", "85744");
props.setProperty("topic.prefix", "my-app-connector");
props.setProperty("schema.history.internal", "io.debezium.storage.file.history.FileSchemaHistory");
props.setProperty("schema.history.internal.file.filename", "/path/to/storage/schemahistory.dat");// Create the engine with this configuration ...
try (DebeziumEngine<ChangeEvent<String, String>> engine = DebeziumEngine.create(Json.class).using(props).notifying(record -> {System.out.println(record);}).build()) {// Run the engine asynchronously ...ExecutorService executor = Executors.newSingleThreadExecutor();executor.execute(engine);// Do something else or wait for a signal or an event
}
// Engine is stopped when the main code is finished

让我们更详细地研究这段代码,从我们在这里重复的前几行开始:

// Define the configuration for the Debezium Engine with MySQL connector...
final Properties props = new Properties();
props.setProperty("name", "engine");
props.setProperty("connector.class", "io.debezium.connector.mysql.MySqlConnector");
props.setProperty("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore");
props.setProperty("offset.storage.file.filename", "/path/to/storage/offsets.dat");
props.setProperty("offset.flush.interval.ms", "60000");

这将创建一个新的标准Properties对象,用于设置引擎所需的几个字段,无论使用哪个连接器。第一个字段是引擎的名称,它将在连接器产生的源记录和内部状态中使用,因此在应用程序中使用一些有意义的名称。
connector.class字段定义了扩展Kafka Connect org.apache.kafka.connect.source.SourceConnector抽象类的类名;在此示例中,我们指定了Debezium的MySqlConnector类。

当Kafka Connect连接器运行时,它会从源中读取信息,并定期记录定义了它已经处理了多少信息的"偏移量"。如果连接器重新启动,它将使用最后记录的偏移量来确定在源信息中应该从哪里恢复读取。由于连接器不知道也不关心偏移量的存储方式,因此引擎需要提供一种存储和恢复这些偏移量的方式。我们的配置的下几个字段指定了我们的引擎应该使用FileOffsetBackingStore类将偏移量存储在本地文件系统上的/path/to/storage/offset.dat文件中(文件可以任意命名和存储在任何位置)。此外,尽管连接器在生成每个源记录时记录偏移量,但引擎会定期将偏移量刷新到后备存储(在我们的示例中,每分钟刷新一次)。这些字段可以根据您的应用程序需要进行调整。

接下来的几行定义了特定于连接器的字段(在每个连接器文档中有记录),在我们的示例中是MySqlConnector连接器:

 /* begin connector properties */props.setProperty("database.hostname", "localhost")props.setProperty("database.port", "3306")props.setProperty("database.user", "mysqluser")props.setProperty("database.password", "mysqlpw")props.setProperty("database.server.id", "85744")props.setProperty("topic.prefix", "my-app-connector")props.setProperty("schema.history.internal", "io.debezium.storage.file.history.FileSchemaHistory")props.setProperty("schema.history.internal.file.filename", "/path/to/storage/schemahistory.dat")

在这里,我们设置了MySQL数据库服务器运行的主机机器的名称和端口号,并定义了将用于连接到MySQL数据库的用户名和密码。请注意,对于MySQL,用户名和密码应对应于已被授予以下MySQL权限的MySQL数据库用户:

  • SELECT
  • RELOAD
  • SHOW DATABASES
  • REPLICATION SLAVE
  • REPLICATION CLIENT

在读取数据库的一致快照时,需要前三个权限。最后两个权限允许数据库读取通常用于MySQL复制的服务器的binlog。

该配置还包括一个用于MySQL的数值标识符。由于MySQL的binlog是MySQL复制机制的一部分,因此为了读取binlog,MySqlConnector实例必须加入MySQL服务器组,这意味着该服务器ID必须是1到232-1之间的任意整数。在我们的代码中,我们将其设置为一个相当大但有些随机的值,仅供我们的应用程序使用。

该配置还指定了MySQL服务器的逻辑名称。连接器将此逻辑名称包含在其生成的每个源记录的主题字段中,使您的应用程序能够区分这些记录的来源。我们的示例使用了一个名为"products"的服务器名称,这可能是因为数据库包含产品信息。当然,您可以为您的应用程序命名任何有意义的名称。

当MySqlConnector类运行时,它会读取MySQL服务器的binlog,其中包括对由服务器托管的数据库所做的所有数据更改和模式更改。由于所有数据更改都是基于拥有表格的模式结构化的,因此连接器需要跟踪所有模式更改,以便可以正确解码更改事件。连接器记录模式信息,以便如果连接器重新启动并恢复从最后记录的偏移量读取,它知道该偏移量时数据库模式的确切外观。连接器如何记录数据库模式历史记录在我们的配置的最后两个字段中定义,即我们的连接器应该使用FileSchemaHistory类将数据库模式历史更改存储在本地文件系统上的/path/to/storage/schemahistory.dat文件中(同样,此文件可以任意命名和存储在任何位置)。

最后,使用build()方法构建不可变配置。(顺便说一下,我们可以使用Configuration.read(…)方法之一从属性文件中读取配置,而不是通过编程方式构建它。)

现在我们有了一个配置,我们可以创建引擎。以下是相关的代码行:

// Create the engine with this configuration ...
try (DebeziumEngine<ChangeEvent<String, String>> engine = DebeziumEngine.create(Json.class).using(props).notifying(record -> {System.out.println(record);}).build()) {
}

所有的更改事件都将传递给给定的处理方法,该方法必须与java.util.function.Consumer<‍R‍>函数接口的签名匹配,其中<‍R‍>必须与调用create()时指定的格式类型匹配。请注意,您的应用程序的处理函数不应抛出任何异常;如果抛出异常,引擎将记录方法抛出的任何异常,并继续处理下一个源记录,但您的应用程序将没有机会处理导致异常的特定源记录,这意味着您的应用程序可能与数据库不一致。

此时,我们有一个已配置并准备运行的DebeziumEngine对象,但它什么也不做。DebeziumEngine设计为由Executor或ExecutorService异步执行:

// Run the engine asynchronously ...
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.execute(engine);// Do something else or wait for a signal or an event

您的应用程序可以通过调用其 close() 方法来安全、优雅地停止引擎:

// At some later time ...
engine.close();

或者,由于引擎支持Closeable接口,当离开try块时,它将被自动调用。
引擎的连接器将停止从源系统读取信息,将所有剩余的更改事件转发给处理函数,并将最新的偏移量刷新到偏移量存储中。只有在所有这些操作完成后,引擎的run()方法才会返回。如果您的应用程序需要在退出之前等待引擎完全停止,您可以使用ExecutorService的shutdown和awaitTermination方法来实现:

try {executor.shutdown();while (!executor.awaitTermination(5, TimeUnit.SECONDS)) {logger.info("Waiting another 5 seconds for the embedded engine to shut down");}
}
catch ( InterruptedException e ) {Thread.currentThread().interrupt();
}

或者,您可以在创建DebeziumEngine时注册CompletionCallback作为回调函数,以便在引擎终止时得到通知。

请记住,当JVM关闭时,它只会等待非守护线程。因此,当您在守护线程上运行引擎时,如果您的应用程序退出,请确保等待引擎进程完成。

为了确保优雅和完全的关闭,并确保每个源记录仅发送一次到应用程序,您的应用程序应始终正确停止引擎。例如,不要依赖于关闭ExecutorService,因为这会中断运行的线程。虽然当线程被中断时,DebeziumEngine确实会终止,但引擎可能无法完全终止,并且当您的应用程序重新启动时,它可能会看到在关闭之前处理的一些相同的源记录。

正如前面提到的,DebeziumEngine接口有两个实现。这两个实现使用相同的API,前面的代码示例对两个版本都有效。唯一的例外是创建DebeziumEngine实例。正如在介绍中提到的,默认情况下使用EmbeddedEngine实现。因此,DebeziumEngine.create(Json.class)方法在内部使用EmbeddedEngine实例。

如果您想使用新的AsyncEmbeddedEngine实例,可以使用以下方法:DebeziumEngine#create(KeyValueHeaderChangeEventFormat<‍K, V, H‍> format, String builderFactory)

例如,要创建一个使用AsyncEmbeddedEngine并以JSON作为其键、值和标头格式的嵌入式引擎,您可以使用以下代码:

try (DebeziumEngine<ChangeEvent<String, String>> engine = DebeziumEngine.create(KeyValueHeaderChangeEventFormat.of(Json.class, Json.class, Json.class),"io.debezium.embedded.async.ConvertingAsyncEngineBuilderFactory").using(props).notifying(record -> {System.out.println(record);}).build()) {// Also run the engine asynchronously ...ExecutorService executor = Executors.newSingleThreadExecutor();executor.execute(engine);// Do something else or wait for a signal or an event
}

输出消息格式

DebeziumEngine#create()可以接受多个不同的参数,这些参数会影响消息被消费者接收的格式。允许的值为:

  • Connect.class - 输出值是包装Kafka Connect的SourceRecord的变更事件
  • Json.class - 输出值是键和值对,编码为JSON字符串
  • JsonByteArray.class - 输出值是键和值对,格式化为JSON并编码为UTF-8字节数组
  • Avro.class - 输出值是以Avro序列化记录编码的键和值对
  • CloudEvents.class - 输出值是编码为 消息的键和值对

在调用DebeziumEngine#create()时也可以指定标头格式。允许的值为:

  • Json.class - 标头值被编码为JSON字符串
  • JsonByteArray.class - 标头值被格式化为JSON并编码为UTF-8字节数组

在内部,引擎将数据转换委托给Kafka Connect或Apicurio转换器实现,使用最适合执行转换的算法。可以使用引擎属性对转换器进行参数化以修改其行为。JSON输出格式的示例:

final Properties props = new Properties();
...
props.setProperty("converter.schemas.enable", "false"); // don't include schema in message
...
final DebeziumEngine<ChangeEvent<String, String>> engine = DebeziumEngine.create(Json.class).using(props).notifying((records, committer) -> {for (ChangeEvent<String, String> r : records) {System.out.println("Key = '" + r.key() + "' value = '" + r.value() + "'");committer.markProcessed(r);}
...

其中 ChangeEvent 数据类型是键/值对。

消息转换

在将消息传递给处理程序之前,可以通过Kafka Connect的简单消息转换(SMT)管道运行它们。每个SMT可以将消息保持不变、修改消息或过滤消息。使用属性transforms配置链。属性包含要应用的转换的逗号分隔的逻辑名称列表。然后,属性transforms.<‍logical_name‍>.type为每个转换定义了实现类的名称,transforms.<‍logical_name‍>.*配置选项将传递给转换。
配置示例:

final Properties props = new Properties();
...
props.setProperty("transforms", "filter, router");                                               // (1)
props.setProperty("transforms.router.type", "org.apache.kafka.connect.transforms.RegexRouter");  // (2)
props.setProperty("transforms.router.regex", "(.*)");                                            // (3)
props.setProperty("transforms.router.replacement", "trf$1");                                     // (3)
props.setProperty("transforms.filter.type", "io.debezium.embedded.ExampleFilterTransform");      // (4)

定义了两个转换 - 过滤器和路由器

路由器转换的实现是 org.apache.kafka.connect.transforms.RegexRouter

路由器转换有两个配置选项 - 正则表达式和替换

过滤器转换的实现是 io.debezium.embedded.ExampleFilterTransform

消息转换谓词

谓词可以应用于转换,以使转换成为可选的。

配置示例如下

final Properties props = new Properties();
...
props.setProperty("transforms", "filter");                                                 // (1)
props.setProperty("predicates", "headerExists");                                           // (2)
props.setProperty("predicates.headerExists.type", "org.apache.kafka.connect.transforms.predicates.HasHeaderKey"); //(3)
props.setProperty("predicates.headerExists.name", "header.name");                          // (4)
props.setProperty("transforms.filter.type", "io.debezium.embedded.ExampleFilterTransform");// (5)
props.setProperty("transforms.filter.predicate", "headerExists");                          // (6)
props.setProperty("transforms.filter.negate", "true"); 

定义了一个转换 - 过滤器

定义了一个谓词 - headerExists

headerExists 谓词的实现是 org.apache.kafka.connect.transforms.predicates.HasHeaderKey

headerExists 谓词有一个配置选项 - name

过滤器转换的实现是 io.debezium.embedded.ExampleFilterTransform

过滤器转换需要谓词 headerExists

过滤器转换期望谓词的值被否定,从而使谓词确定标头是否不存在

高级记录使用

对于某些用例,例如尝试批量写入记录或针对异步 API 时,上面描述的功能接口可能具有挑战性。在这些情况下,使用 io.debezium.engine.DebeziumEngine.ChangeConsumer. 接口可能会更容易。

此接口具有单个函数,其签名如下:

/*** Handles a batch of records, calling the {@link RecordCommitter#markProcessed(Object)}* for each record and {@link RecordCommitter#markBatchFinished()} when this batch is finished.* @param records the records to be processed* @param committer the committer that indicates to the system that we are finished*/void handleBatch(List<R> records, RecordCommitter<R> committer) throws InterruptedException;

如Javadoc中所提到的,RecordCommitter对象将在每个记录和每个批次完成时被调用。RecordCommitter接口是线程安全的,这允许对记录进行灵活的处理。

您可以选择重写已处理的记录的偏移量。这可以通过首先调用RecordCommitter#buildOffsets()构建一个新的Offsets对象,使用Offsets#set(String key, Object value)更新偏移量,然后使用更新后的Offsets调用RecordCommitter#markProcessed(SourceRecord record, Offsets sourceOffsets)来完成。

要使用ChangeConsumer API,您必须将接口的实现传递给通知API,如下所示:

class MyChangeConsumer implements DebeziumEngine.ChangeConsumer<RecordChangeEvent<SourceRecord>> {public void handleBatch(List<RecordChangeEvent<SourceRecord>> records, RecordCommitter<RecordChangeEvent<SourceRecord>> committer) throws InterruptedException {...}
}
// Create the engine with this configuration ...
DebeziumEngine<RecordChangeEvent<SourceRecord>> engine = DebeziumEngine.create(ChangeEventFormat.of(Connect.class)).using(props).notifying(new MyChangeConsumer()).build();

如果使用 JSON 格式(等效格式也适用于其他格式),则代码将如下所示:

class JsonChangeConsumer implements DebeziumEngine.ChangeConsumer<ChangeEvent<String, String>> {public void handleBatch(List<ChangeEvent<String, String>> records,RecordCommitter<ChangeEvent<String, String>> committer) throws InterruptedException {...}
}
// Create the engine with this configuration ...
DebeziumEngine<ChangeEvent<String, String>> engine = DebeziumEngine.create(Json.class).using(props).notifying(new JsonChangeConsumer()).build();

引擎属性

除非有默认值,否则以下配置属性是必需的(为了文本格式化,Java 类的包名称被替换为 <…​>)。

属性默认值描述
name连接器实例的唯一名称。
connector.class连接器的 Java 类的名称
offset.storage负责连接器偏移持久性的 Java 类的名称。
offset.storage.file.filename存储偏移量的文件的路径。
offset.storage.topic要存储偏移量的 Kafka 主题的名称。
offset.storage.partitions创建偏移量存储主题时使用的分区数。
offset.storage.replication.factor创建偏移存储主题时使用的复制因子。
offset.commit.policy提交策略的 Java 类的名称。它根据处理的事件数和自上次提交以来经过的时间定义何时触发偏移提交。默认是基于时间间隔的定期提交策略。
offset.flush.interval.ms60000尝试提交偏移的间隔。默认值为 1 分钟。
offset.flush.timeout.ms5000在取消该过程并恢复将来尝试提交的偏移数据之前,等待记录刷新和分区要提交到偏移存储的最大毫秒数。默认值为 5 秒。
errors.max.retries-1失败前连接错误的最大重试次数(-1 = 无限制,0 = 禁用,> 0 = 重试次数)。
errors.retry.delay.initial.ms300遇到连接错误时重试的初始延迟(以毫秒为单位)。每次重试时此值将加倍,但不会超过 errors.retry.delay.max.ms。
errors.retry.delay.max.ms10000遇到连接错误时重试之间的最大延迟(以毫秒为单位)。

异步引擎属性

属性默认值描述
record.processing.threads根据工作负载和可用 CPU 核心数按需分配线程。可用于处理更改事件记录的线程数。如果未指定任何值(默认值),则引擎将使用 Java ThreadPoolExecutor 根据当前工作负载动态调整线程数。最大线程数是给定计算机上的 CPU 核心数。如果指定了值,则引擎将使用 Java 固定线程池方法创建具有指定线程数的线程池。要使用给定计算机上的所有可用核心,请设置占位符值 AVAILABLE_CORES。
record.processing.shutdown.timeout.ms1000调用任务关闭后等待处理已提交记录的最长时间(以毫秒为单位)。
record.processing.orderORDERED确定应如何生成记录。ORDERED记录按顺序处理;也就是说,它们按从数据库获取的顺序生成。UNORDERED记录按非顺序处理;也就是说,它们可以按与源数据库不同的顺序生成。UNORDERED 选项的非顺序处理可实现更好的吞吐量,因为记录在任何 SMT 处理和消息序列化完成后立即生成,而无需等待其他记录。当向引擎提供 ChangeConsumer 方法时,此选项不起作用。
record.processing.with.serial.consumerfalse指定是否应从提供的 Consumer 创建默认的 ChangeConsumer,从而导致串行 Consumer 处理。如果您在使用 API 创建引擎时指定了 ChangeConsumer 接口,则此选项无效。
task.management.timeout.ms180,000 (3 min)引擎等待任务生命周期管理操作(启动和停止)完成的时间(以毫秒为单位)。

数据库模式历史属性

一些连接器还需要一组额外的属性来配置数据库模式历史记录:

  • MySQL
  • SQL Server
  • Oracle
  • Db2

如果没有正确配置数据库模式历史记录,则连接器将拒绝启动。默认配置需要可用的Kafka集群。对于其他部署,可使用基于文件的数据库模式历史记录存储实现。

属性默认值描述
schema.history.internal负责持久保存数据库模式历史的 Java 类的名称。
schema.history.internal.file.filename存储数据库架构历史记录的文件的路径。
schema.history.internal.kafka.topic存储数据库架构历史记录的 Kafka 主题。
schema.history.internal.kafka.bootstrap.servers要连接的 Kafka 集群服务器的初始列表。集群提供用于存储数据库架构历史记录的主题。

处理故障

当引擎执行时,其连接器会主动记录每个源记录中的源偏移,并且引擎会定期将这些偏移刷新到持久存储中。当应用程序和引擎正常关闭或崩溃时,重新启动后,引擎及其连接器将从最后记录的偏移处恢复读取源信息。

那么,当嵌入式引擎正在运行时应用程序发生故障会发生什么?结果是,在重新启动后,应用程序很可能会收到一些之前在崩溃之前已经处理过的源记录。这取决于引擎多久将偏移刷新到其存储中(通过offset.flush.interval.ms属性)以及特定连接器在一个批次中返回多少个源记录。最理想的情况是每次都刷新偏移量(例如,将offset.flush.interval.ms设置为0),但即使这样,嵌入式引擎仍然只会在从连接器接收到每个源记录批次后刷新偏移量。
例如,MySQL连接器使用max.batch.size来指定批次中可能出现的源记录的最大数量。即使将offset.flush.interval.ms设置为0,当应用程序在崩溃后重新启动时,可能会看到最多n个重复记录,其中n是批次的大小。如果将offset.flush.interval.ms属性设置得更高,则应用程序可能会看到最多n * m个重复记录,其中n是批次的最大大小,m是在单个偏移刷新间隔期间可能累积的批次数。(显然,可以将嵌入式连接器配置为不进行批处理并始终刷新偏移量,从而使应用程序永远不会接收到任何重复的源记录。但是,这会大大增加开销并降低连接器的吞吐量。)

总的来说,当使用嵌入式连接器时,应用程序在正常操作期间(包括在正常关闭后重新启动)将仅接收到每个源记录一次,但在崩溃或不正确关闭后重新启动后,需要容忍接收到重复事件。如果应用程序需要更严格的确切一次性行为,那么应该使用完整的Debezium平台,该平台可以提供确切一次性保证(即使在崩溃和重新启动后)。

相关文章:

Debezium日常分享系列之:Debezium Engine

Debezium日常分享系列之&#xff1a;Debezium Engine 依赖打包项目在代码中输出消息格式消息转换消息转换谓词高级记录使用引擎属性异步引擎属性数据库模式历史属性处理故障 Debezium连接器通常通过部署到Kafka Connect服务来运行&#xff0c;并配置一个或多个连接器来监视上游…...

运行 GreatSQL 时为什么要求关闭透明大页

在大部分运维规范中&#xff0c;一般都会要求在运行 GreatSQL/MySQL 的环境中要关闭透明大页&#xff0c;那么到底什么是透明大页&#xff0c;为什么要关闭&#xff0c;打开有什么风险吗&#xff1f; 在此之前&#xff0c;我也是有点懵的&#xff0c;本文试着回答这个疑问&…...

【Rive】Rive在Android上的简单应用

1 前言 Rive 是一款强大的矢量图编辑器&#xff0c;可以设计图形、也可以制作动画。Rive 提供了矩形、圆形、三角形、多边形、星形、钢笔、文字等工具来绘制各式各样的矢量图形&#xff1b;提供了平移、旋转、缩放等工具对矢量图形进行各种变换&#xff1b;提供了骨骼、约束、时…...

Base 崛起,SynFutures 或成生态系统中最具潜力应用

10月份的 Unchained Crypto 采访中&#xff0c;Solana 联合创始人 Anatoly 表示&#xff0c;通过观察活跃地址数、TVL、DeFi 版块、Meme 热潮和开发者生态等多个关键指标&#xff0c;察觉到 Base 势头正猛&#xff0c;成为以太坊生态最强劲的 L2。 11月下旬&#xff0c;小狐狸创…...

探索Go语言中的循环双向链表

简介 循环双向链表将双向链表的灵活性与循环结构相结合&#xff0c;使得每个节点都有一个指向前一个节点和后一个节点的指针&#xff0c;并且最后一个节点的Next指针指向头节点&#xff0c;形成一个闭环。本文将深入探讨如何在Go语言中实现和操作这种数据结构。 循环双向链表…...

Leetcode617.合并二叉树(HOT100)+Leetcode79. 单词搜索(HOT100)

链接 代码&#xff1a; class Solution { public:TreeNode* mergeTrees(TreeNode* root1, TreeNode* root2) {if(!root1)return root2;if(!root2)return root1;root1->valroot2->val;root1->left mergeTrees(root1->left,root2->left);root1->right merg…...

亚马逊云(AWS)使用root用户登录

最近在AWS新开了服务器&#xff08;EC2&#xff09;&#xff0c;用于学习&#xff0c;遇到一个问题就是默认是用ec2-user用户登录&#xff0c;也需要密钥对。 既然是学习用的服务器&#xff0c;还是想直接用root登录&#xff0c;下面开始修改&#xff1a; 操作系统是&#xff1…...

使用Docker在Ubuntu 22.04上部署MySQL数据库的完整指南

使用Docker在Ubuntu 22.04上部署MySQL数据库的完整指南 在现代应用开发中&#xff0c;使用Docker来部署数据库已成为一种流行的做法。本文将详细介绍如何在Ubuntu 22.04系统上使用Docker部署最新版本的MySQL数据库&#xff0c;包括关键注意事项、详细步骤、闭坑指南以及总结。…...

算法笔记:力扣15、三数之和

思路&#xff1a; 实现代码 class Solution {public List<List<Integer>> threeSum(int[] nums) {List<List<Integer>> result new ArrayList<>(); Arrays.sort(nums); // 先对数组进行排序 for (int i 0; i < nums.length - 2; i) { /…...

perf list PMU 缓存事件

事件标识事件解释PMU事件路径l1d_cacheL1数据缓存的访问次数&#xff0c;L1缓存是CPU内部最快的缓存&#xff0c;位于距离CPU核心非常近的位置。armv8_pmuv3/l1d_cache/l1d_cache_lmiss_rd表示从L1数据缓存读取数据时发生缓存未命中的次数。armv8_pmuv3/l1d_cache_lmiss_rd/l1d…...

使用C#开发VTK笔记(一)-VTK开发环境搭建

一.使用C#开发VTK的背景 因为C#开发的友好性,一直都比较习惯于从C#开发程序。而长期以来,都希望有一个稳定可靠的三位工程数模的开发演示平台,经过多次对比之后,感觉VTK和OpenCasCade这两个开源项目是比较好的,但它们都是用C++编写的,我用C#形式开发,只能找到发布的C#组…...

2024Selenium自动化常见问题!

"NoSuchElementException"异常&#xff1a; 确保使用了正确的选择器来定位元素。可以使用id、class、XPath或CSS选择器等。 可以尝试使用find_elements方法来查找元素列表&#xff0c;并检查列表的长度来判断元素是否存在。 使用显式等待&#xff08;WebDriverWait…...

考研英语翻译与大小作文

名词动化词 1 持有 harbor2 2 反映 mirror 3 缩短 bridge 4 使用 harness 5 掩饰 mask/veil 6 修改 tailor 7 汇集 pool 8 控制 curb 9 想象 picture 10 激发 trigger 拉丁…...

详解Rust异步编程

文章目录 多线程编程与异步编程对比并发模型对比分析异步编程基础概念及用法 Rust的异步编程通过async/await语法和Future特性提供了一种高效的方式来处理并发任务&#xff0c;尤其在I/O密集型操作中表现出色。async/await异步编程模型性能高&#xff0c;还能支持底层编程&…...

Vue + Element UI 实战技巧:如何实现 el-table 重新加载数据后折叠所有展开行

在 Vue 中使用 Element UI 的 el-table 组件时&#xff0c;如果你想要在数据重新加载后折叠所有行的展开状态&#xff0c;你可以通过维护一个数据属性来追踪哪些行是展开的&#xff0c;并在数据更新时重置这个属性。 以下是一个简单的示例来说明如何实现这个功能&#xff1a; …...

linux静态链接和动态链接

静态链接的特点 程序独立性高 静态链接是在程序编译时&#xff0c;将所有需要的目标文件以及它们所依赖的库文件中的代码和数据链接成一个可执行文件。一旦链接完成&#xff0c;这个可执行文件就包含了运行所需的全部内容&#xff0c;不依赖外部的库文件。例如&#xff0c;一个…...

计算机网络学习资料全攻略

计算机网络是计算机科学中一个非常重要的分支&#xff0c;它涉及到数据在计算机系统之间的传输和通信。随着互联网的快速发展&#xff0c;对计算机网络知识的掌握变得越来越重要。本文将为您提供一份全面的计算机网络学习资料指南&#xff0c;帮助您从基础到高级逐步深入学习。…...

第七课 Unity编辑器创建的资源优化_UI篇(UGUI)

上期我们学习了简单的Scene优化&#xff0c;接下来我们继续编辑器创建资源的UGUI优化 UI篇&#xff08;UGUI&#xff09; 优化UGUI应从哪些方面入手&#xff1f; 可以从CPU和GPU两方面考虑&#xff0c;CPU方面&#xff0c;避免触发或减少Canvas的Rebuild和Rebatch&#xff0c…...

Go的简单问题问答

基础问题回答 Go 的主要特点是什么&#xff1f; 简洁&#xff1a;语法简化&#xff0c;减少复杂性。并发&#xff1a;内置 Goroutine 和 Channel&#xff0c;支持轻量级并发。静态类型&#xff1a;强类型语言&#xff0c;编译时检查错误。跨平台&#xff1a;编译生成独立的二进…...

SVN迁移至Git,保留commit提交记录

SVN迁移至Git 如何将 SVN 仓库迁移到 Git 并保留提交记录一、生成userinfo.txt二. 使用 git svn clone 命令迁移 SVN 到 Git2.1. 基本命令格式2.2. 示例&#xff1a;从 SVN 克隆到 Git参数说明&#xff1a;2.3 执行的过程遇到的窗口2.4. 迁移过程 三. 将 Git 仓库推送到远程 Gi…...

一站式指导:在Neo4j与PostgreSQL间实现高效数据同步

作者&#xff1a;后端小肥肠 &#x1f347; 我写过的文章中的相关代码放到了gitee&#xff0c;地址&#xff1a;xfc-fdw-cloud: 公共解决方案 &#x1f34a; 有疑问可私信或评论区联系我。 &#x1f951; 创作不易未经允许严禁转载。 姊妹篇&#xff1a; 数据同步的艺术&#…...

linux-安全-iptables防火墙基础笔记

目录 一、 iptables链结构 五链 二、 iptables表结构 四表 三、 匹配流程 四、 语法 五、 匹配 1. 通用匹配 2. 隐含匹配 3. 显示匹配 六、 SNAT 七、 DNAT 八、 规则备份及还原 1. 备份 2. 还原 这篇将讲解iptables防火墙的基础知识 一、 iptables链结构 规则…...

Redis——主从复制原理

Redis的主从复制原理是其高可用性和分布式读取能力的重要基础。以下是Redis主从复制原理的详细解释&#xff1a; 一、主从复制的基本概念 Redis的主从复制是一种数据复制和备份的方式&#xff0c;它允许一个主节点&#xff08;Master&#xff09;将其所有的数据同步到一个或多…...

vue2 虚拟DOM 和 真实DOM (概念、作用、Diff 算法)

虚拟 DOM 和 真实DOM&#xff08;概念、作用、Diff 算法&#xff09; 1.1 概念 真实 DOM&#xff08;Document Object Model&#xff09;&#xff1a;是浏览器中用于表示文档结构的树形结构。 <h2>你好</h2>虚拟DOM&#xff1a;用 JavaScript 对象来模拟真实 DOM…...

王道考研编程题总结

我还在完善中&#xff0c;边复习边完善&#xff08;这个只是根据我自身总结的&#xff09; 一、 线性表 1. 结构体 #define MaxSize 40 typedef struct{ElemType data[MaxSize]&#xff1b;int length; }SqList 2. 编程题 1. 删除最小值 题意 &#xff1a;从顺序表中删除…...

手机租赁系统开发全攻略 创新服务助力企业智能转型

内容概要 在当今数字化飞速发展的时代&#xff0c;“手机租赁系统开发”正逐渐成为企业智能转型的必然选择。这一过程并不简单&#xff0c;但关键流程的解析将帮助企业理清思路。首先&#xff0c;了解需求和目标是基础&#xff0c;之后制定详细计划和流程图&#xff0c;让整件…...

git回退到某个版本git checkout和git reset命令的区别

文章目录 1. git checkout <commit>2. git reset --hard <commit>两者的区别总结推荐使用场景* 在使用 Git 回退到某个版本时&#xff0c; git checkout <commit> 和 git reset --hard <commit> 是两种常见的方式&#xff0c;但它们的用途和影响有很…...

如何使用Spring Boot进行Web开发?

Spring Boot 是一个基于 Java 的框架&#xff0c;它简化了新 Spring 应用的初始设置和开发过程。使用 Spring Boot 进行 Web 开发可以让你快速创建独立的、生产级别的基于 Spring 的应用。下面是使用 Spring Boot 进行 Web 开发的基本步骤&#xff1a; 文章目录 1. 环境准备2. …...

error=‘null‘], commandType=io.lettuce.core.RedisPublisher$SubscriptionCommand]

问题 查看java应用启动日志输出下面错误&#xff1a; errornull], commandTypeio.lettuce.core.RedisPublisher$SubscriptionCommand] Completing command LatencyMeteredCommand [typeINFO, outputStatusOutput [output# Server redis_version:4.0.14 redis_git_sha1:000…...

AI PC处理器ARM架构-引入NPU和大模型

AI PC处理器架构变化&#xff1a;ARM低功耗、引入NPU和大模型 AI进化加速端侧落地&#xff0c;新一轮浪潮蓄势待发(2024)”。ARM(Advanced RISC Machine)架构和x86架构是两种主要的处理器架构&#xff0c;它们在设计理念、应用场景和性能特点等方面有显著的差异。 ARM架构是一…...

python之opencv库Haar级联分类器检测人脸--‘haarcascade_frontalface_default.xml‘

python之opencv库Haar级联分类器检测人脸–‘haarcascade_frontalface_default.xml’ opencv库&#xff1a; 它由 Intel 公司发起并参与开发&#xff0c;其初衷是为了提供高效的计算机视觉算法实现。随着计算机视觉领域的发展&#xff0c;OpenCV不断更新和完善&#xff0c;吸引…...

「Mac畅玩鸿蒙与硬件37」UI互动应用篇14 - 随机颜色变化器

本篇将带你实现一个随机颜色变化器应用。用户点击“随机颜色”按钮后&#xff0c;界面背景会随机变化为淡色系颜色&#xff0c;同时显示当前的颜色代码&#xff0c;页面还会展示一只猫咪图片作为装饰&#xff0c;提升趣味性。 关键词 UI互动应用随机颜色生成状态管理用户交互…...

确定 POST 请求中的数据字段

在使用 requests 进行 HTTP 请求时&#xff0c;data 和 params 是两种常见的参数&#xff0c;用于传递不同类型的数据。以下是它们的作用和区别&#xff1a; 1. data 的作用 用于 POST 请求的主体。通常传递表单数据或 JSON 数据。在 HTTP 请求中&#xff0c;data 中的内容会…...

Linux - DNS服务器

六、DNS服务器 1、简介 DNS&#xff08;Domain Name System&#xff09;是互联网上的一项服务&#xff0c;它作为将域名和IP地址相互映射的一个分布式 数据库&#xff0c;能够使人更方便的访问互联网。 DNS系统使用的是网络的查询&#xff0c;那么自然需要有监听的port。DNS使…...

探究 SpringBoot 结合 MVC 高校办公室行政事务管理系统的设计与应用实现

摘 要 身处网络时代&#xff0c;随着网络系统体系发展的不断成熟和完善&#xff0c;人们的生活也随之发生了很大的变化&#xff0c;人们在追求较高物质生活的同时&#xff0c;也在想着如何使自身的精神内涵得到提升&#xff0c;而读书就是人们获得精神享受非常重要的途径。为了…...

蓝桥杯-扫雷

这题不难&#xff0c;就是麻烦一点&#xff0c;这里暴力求解了直接 题目链接&#xff1a; 扫雷 AC代码&#xff1a; import java.util.Scanner; // 1:无需package // 2: 类名必须Main, 不可修改public class Main {public static void main(String[] args) {Scanner scan ne…...

Hive高可用配置

在hive的商用上没有集群一说&#xff0c;而且它本身也不是数据库&#xff0c;只是hadoop的数据sql化工具&#xff0c;但是hive可以配置高可用&#xff0c;通常业内对元数据服务会开5个&#xff0c;而HS2服务开3个&#xff0c;来保证hive服务的高可用 配置方式也很简单&#xf…...

探索AI新世界!热门工具与学习资源免费获取

​抖知书老师推荐&#xff1a; 人工智能技术的迅速发展让人们既充满期待又有些迷茫。有人担忧被AI技术取代&#xff0c;有人却积极拥抱这场科技浪潮。无论你处于哪种心态&#xff0c;人工智能已经深入到我们生活的方方面面。如果你希望轻松掌握最新的AI工具与动态&#xff0c;…...

MAUI APP开发蓝牙协议的经验分享:与跳绳设备对接

在开发MAUI应用程序时&#xff0c;蓝牙协议的应用是一个重要的环节&#xff0c;尤其是在需要与外部设备如智能跳绳进行数据交换的场景中。以下是我在开发过程中的一些经验和心得&#xff0c;希望能为你的项目提供帮助。 1. 蓝牙协议基础 蓝牙协议是无线通信的一种标准&#x…...

常见Linux命令(详解)

文章目录 常见Linux命令文件目录类命令pwd 打印当前目录的绝对路径ls 列出目录内容cd 切换路径mkdir 建立目录rmdir 删除目录touch 创建空文件cp 复制文件或目录rm 移除文件或者目录mv 移动文件与目录或重命名cat 查看文件内容more 文件分屏查看器less 分屏显示文件内容head 显…...

LeetCode763. 划分字母区间(2024冬季每日一题 23)

给你一个字符串 s 。我们要把这个字符串划分为尽可能多的片段&#xff0c;同一字母最多出现在一个片段中。 注意&#xff0c;划分结果需要满足&#xff1a;将所有划分结果按顺序连接&#xff0c;得到的字符串仍然是 s 。 返回一个表示每个字符串片段的长度的列表。 示例 1&a…...

【k8s 深入学习之 event 聚合】event count累记聚合(采用 Patch),Message 聚合形成聚合 event(采用Create)

参考 15.深入k8s:Event事件处理及其源码分析 - luozhiyun - 博客园event 模块总览 EventRecorder:是事件生成者,k8s组件通过调用它的方法来生成事件;EventBroadcaster:事件广播器,负责消费EventRecorder产生的事件,然后分发给broadcasterWatcher;broadcasterWatcher:用…...

Java--数组的定义与使用

1.数组的基本概念 1.1为什么用数组 在程序设计中,每一个数据总是对应一个变量.当数据量越大,就需要更多的变量来存储.我们将相同类型的数据存储到一个集合中,就可以更方便我们对数据进行访问,同时可以减少不断定义变量.这个集合就叫做数组 1.2数组的定义 数组是一种基本的数…...

tcpdump抓包wireshark分析

背景 分析特定协议的数据包&#xff0c;如 HTTP、DNS、TCP、UDP 等&#xff0c;诊断网络问题&#xff0c;例如连接故障、延迟和数据包丢失。 大概过程 1.安装tcpdump yum update yum install tcpdump2.抓包&#xff0c;从当前时间起&#xff0c;一小时后停止&#xff0c…...

qtcanpool 知 09:测试框架

文章目录 前言不满改进优化后语 前言 很久以前&#xff0c;作者写的代码都没有测试用例&#xff0c;最多就是写个 demo 验证一下&#xff0c;毕竟不是专业出身&#xff0c;也没经过大公司的洗礼。 后来&#xff0c;参与到一些项目才知道有专门的测试&#xff0c;而且开发也要测…...

使用Apache HttpClient发起一个GET HTTP请求

Apache HttpClient 是一个强大且灵活的Java库&#xff0c;用于处理HTTP请求。 它提供了广泛的功能&#xff0c;包括对不同HTTP方法的支持、连接管理、Cookie处理等。 无论是与RESTful API交互、下载网页内容还是自动化网页任务&#xff0c;Apache HttpClient 都能通过其简洁而…...

C++ STL 容器系列(三)list —— 编程世界的万能胶,数据结构中的百变精灵

STL系列学习参考&#xff1a; C STL系列__zwy的博客-CSDN博客https://blog.csdn.net/bite_zwy/category_12838593.html 学习C STL的三个境界&#xff0c;会用&#xff0c;明理&#xff0c;能扩展&#xff0c;STL中的所有容器都遵循这个规律&#xff0c;下面我们就按照这三个境…...

【前端学习笔记】TypeScript学习

1.什么是TypeScript TypeScript&#xff08;简称 TS&#xff09;是微软公司开发的一种基于 JavaScript &#xff08;简称 JS&#xff09;语言的编程语言。TypeScript 可以看成是 JavaScript 的超集&#xff08;superset&#xff09;&#xff0c;添加了类型系统和编译时类型检查…...

qt三大调试方法总结(printf\qDebug\qCDebug)

文章目录 1 传统方法2 qDebug传统方法扩展1 控制输出扩展2 日志格式扩展3 日志保存扩展4 源码定义护展5 开源扩展3 qCDebug方法扩展1 控制扩展2 格式化扩展3 保存日志扩展4 源码定义参考1 传统方法 #include<stdio.h> printf(“xboard hello printf”) 2 qDebug传统方法…...

耶鲁大学公开课《心理学导论》学习笔记:第 1 课 - 导论

概述 作为一个程序员&#xff0c;或者说&#xff0c;我们不管做什么行业&#xff0c;都可以或多或少的学习一些心理学 我们在生活工作中&#xff0c;其实都会有意无意的接触一些心理学原理&#xff0c;例如&#xff0c;【番茄工作法】、【内在动机与外在激励】 这里选择的是&…...