当前位置: 首页 > news >正文

Kafka Streams 在监控场景的应用与实践

作者:来自 vivo 互联网服务器团队- Pang Haiyun

介绍 Kafka Streams 的原理架构,常见配置以及在监控场景的应用。

一、背景

在当今大数据时代,实时数据处理变得越来越重要,而监控数据的实时性和可靠性是监控能力建设最重要的一环。随着监控业务需求的变化和技术的发展,需要能够实时处理和分析庞大的数据流。作为一种流式处理平台,Kafka Streams 为处理实时数据提供了强大的支持。本文将重点介绍如何利用 Kafka Streams 进行实时数据处理,包括其基本原理、功能和实际应用。通过本文的学习,读者将能够深入了解 Kafka Streams 的优势、在监控场景的应用及实践。

二、Kafka Streams 的基本概念

Kafka Streams 是一个开源的流式处理框架,基于 Kafka 消息队列构建,能够处理无限量的数据流。与传统的批处理不同,Kafka Streams 允许用户以流式处理的方式实时处理数据,而且处理延迟仅为毫秒级。

通过 Kafka Streams ,用户可以进行数据的实时转换、聚合、过滤等操作,同时能够与 Kafka Connect 和 Kafka Producer/Consumer 无缝集成。Kafka Streams 也是一个客户端程序库,用于处理和分析存储在 Kafka 中的数据,并将得到的数据写回 Kafka 或发送到外部系统。

Kafka、Storm、Flink 和 Spark 是大数据领域常用的工具和框架。

1、区别

  • Kafka 是一个分布式消息系统,主要用于构建实时数据管道和事件驱动的应用程序。它提供了高吞吐量、持久性、可伸缩性和容错性,主要用于数据的发布和订阅。

  • Storm 是一个分布式实时计算系统,用于处理实时数据流。它提供了低延迟、高吞吐量的实时计算能力,适用于实时数据处理和流式计算。

  • Flink 是一个流处理引擎,提供了精确一次的状态处理和事件时间处理等特性。它支持流处理和批处理,并提供了统一的 API 和运行时环境。

  • Spark 是一个通用的大数据处理框架,提供了批处理和流处理的功能。Spark 提供了丰富的数据处理和计算功能,包括 SQL 查询、机器学习、图处理等。

2、Kafka 的优势

  • 持久性和可靠性:Kafka 提供了数据持久化的功能,能够确保数据不丢失,并且支持数据的持久存储和重放。

  • 可伸缩性:Kafka 集群可以很容易地进行水平扩展,支持大规模数据处理和高并发访问。

  • 灵活性:Kafka 可以与各种不同的数据处理框架集成,作为数据源或数据目的地,使其在实时数据处理的场景中具有广泛的适用性。

总的来说,Kafka 的优势在于其高吞吐量、持久性和可靠性,以及灵活的集成能力,使其成为构建实时数据管道和事件驱动应用程序的理想选择。

2.1 Stream 处理拓扑

2.1.1 流

流是 Kafka Streams 提出的最重要的抽象概念:它表示一个无限的,不断更新的数据集。流是一个有序的,可重放(反复的使用),不可变的容错序列,数据记录的格式是键值对(key-value)。这里的 key 主要记录的是 value 的索引,决定了 Kafka 和 Kafka Streams 中数据的分区,即数据如何路由到 Topic 的特定分区。value 是主要后续处理器要处理的数据。

图片

2.1.2 处理器拓扑

处理器拓扑是一个由流(边缘)连接的流处理(节点)的图。通过 Kafka Streams ,我们可以编写一个或多个的计算逻辑的处理器拓扑,用于对数据进行多步骤的处理。

2.1.3 流处理器

流处理器是处理器拓扑中的一个节点;它表示一个处理的步骤,用来转换流中的数据(从拓扑中的上游处理器一次接受一个输入消息,并且随后产生一个或多个输出消息到其下游处理器中)。

在拓扑中有两个特别的处理器:

  • 源处理器(Source Processor):源处理器是一个没有任何上游处理器的特殊类型的流处理器。它从一个或多个 Kafka 主题生成输入流。通过消费这些主题的消息并将它们转发到下游处理器。

  • sink 处理器(Sink Processor):sink 处理器是一个没有下游流处理器的特殊类型的流处理器。它接收上游流处理器的消息发送到一个指定的 Kafka 主题。

图片

(图片来源: Kafka 官网)

Kafka Streams 提供2种方式来定义流处理器拓扑:Kafka  Streams DSL 提供了更常用的数据转换操作,如 map 和 filter;低级别  Processor API 允许开发者定义和连接自定义的处理器,以及和状态仓库交互。处理器拓扑仅仅是流处理代码的逻辑抽象。

2.2 时间

在流处理方面有一些重要的时间概念,它们是建模和集成一些操作的重要元素,例如定义窗口的时间界限。

时间在流中的常见概念如下:

  • 事件时间 - 当一个事件或数据记录发生的时间点,就是最初创建的“源头”。

  • 处理时间 - 事件或数据消息发生在流处理应用程序处理的时间点。即,记录已被消费。处理时间可能是毫秒,小时,或天等。比原始事件时间要晚。

  • 摄取时间 - 事件或数据记录是 Kafka broker 存储在 topic 分区的时间点。与事件时间的差异是,当记录由 Kafka broker 追加到目标 topic 时,生成的摄取时间戳,而不是消息创建时间(“源头”)。与处理时间的差异是处理时间是流处理应用处理记录时的时间。比如,如果一个记录从未被处理,那么就没有处理时间,但仍然有摄取时间。

Kafka Streams 通过 TimestampExtractor 接口为每个数据记录分配一个时间戳。该接口的具体实现了基于数据记录的实际内容检索或计算获得时间戳,例如嵌入时间戳字段提供的事件时间语义,或使用其他的方法,比如在处理时返回当前的 wall-clock(墙钟)时间,从而产生了流应用程序的处理时间语义。因此开发者可以根据自己的业务需要选择执行不同的时间。例如,每条记录时间戳描述了流的时间增长(尽管记录在 stream 中是无序的)并利用时间依赖性来操作,如 join。

最后,当一个 Kafka Streams 应用程序写入记录到 Kafka 时,它将分配时间戳到新的消息。时间戳分配的方式取决于上下文:

  • 当通过处理一些输入记录(例如,在 process()函数调用中触发的 context.forward())生成新的输出记录时,输出记录时间戳直接从输入记录时间戳继承。

  • 当通过周期性函数(如 punctuate())生成新的输出记录时。输出记录时间戳被定义为流任务的当前内部时间(通过 context.timestamp() 获取)。

  • 对于聚合,生成的聚合更新的记录时间戳将被最新到达的输入记录触发更新。

本部分简要介绍了 Kafka Streams 的基本概念,下一部分将介绍 Kafka Streams 的在监控场景的应用实践。

三、Kafka Streams 在监控场景的应用

3.1 链路分布示意图

图片

3.2 示例:使用 Kafka Streams 来处理实时数据

流式处理引擎(如 Kafka Streams)与监控数据 ETL 可以为业务运维带来诸多好处,例如实时数据分析、实时监控、事件驱动的架构等。在本部分,我们将重点介绍  Kafka Streams 与监控数据 ETL 的集成,以及如何在监控数据 ETL 中利用 Kafka Streams 进行实时数据处理。

在监控数据ETL架构中,Kafka Streams 扮演着举足轻重的角色。它可以作为一个独立的数据处理服务来处理实时的数据流,并将处理结果输出到其他存储组件(例如,ES、VM等)中。同时,它也可以作为多个数据源之间的数据交换和通信的桥梁,扮演着数据总线的角色。Kafka Streams 的高可用性、高吞吐量和流式处理能力使得它成为监控数据ETL架构中的重要组件之一。

下面给出一个示例,演示了如何将 Kafka Streams 作为监控数据 ETL 来处理实时的数据。假设我们有一个监控数据流 TopicA,我们希望对这些数据进行实时的分析,并将分析结果输出到另一个 TopicB。我们可以创建一个 Kafka Streams 来处理这个需求:

//创建配置类
Properties props = new Properties();
//设置订阅者
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-processing-service");
//设置servers地址
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092");StreamsBuilder builder = new StreamsBuilder();
//构建流
KStream<String, String> userActions = builder.stream("TopicA");
//对流进行处理
KTable<String, Long> userClickCounts = userActions.filter((key, value) -> value.contains("click")).groupBy((key, value) -> value.split(":")[0]).count();
//流写回Kafka
userClickCounts.toStream().to("TopicB", Produced.with(Serdes.String(), Serdes.Long()));KafkaStreams streams = new KafkaStreams(builder.build(), props);streams.start();

在这个示例中,我们创建了一个 Kafka Streams 监控数据 ETL,用于处理实时的监控数据流。它对数据进行了过滤、分组和统计分析,并将结果输出到 TopicB。通过这个 ETL,我们可以很容易地实现实时的数据处理功能,并且能够与其他数据源和数据存储组件进行无缝的集成。

3.3 监控 ETL 的流处理示意图

图片

本部分介绍了 Kafka Streams 的在监控场景的应用实践,下一部分将深入探讨 Kafka Streams 的运作原理及实时数据处理的常见操作,并阐述 Kafka Streams 如何实现这些操作。

四、监控数据 ETL 中 Kafka Streams 的运作原理

4.1 架构

Kafka Streams 通过生产者和消费者,并利用 Kafka 自有的能力来提供数据平行性,分布式协调性,故障容错和操作简单性,从而简化了应用程序的开发,在本节中,我们将描述 Kafka Streams 是如何工作的。

下图展示了 Kafka Streams 应用程序的解剖图,让我们来看一下。

图片

(图片来源: Kafka 官网)

Kafka 消费者通过消费1个或多个 Topic 拿到数据,形成输入 Kafka 流,经过处理器拓扑对数据进行统一处理形成输出 Kafka 流,将数据写入1个或多个出流 Topic,这是 kafka 流整体的运行流程。

4.1.1 Stream 分区和任务

Kafka 分区数据的消息层用于存储和传输,Kafka Streams  分区数据用于处理, 在这两种情况下,这种分区规划和设计使数据具有弹性,可扩展,高性能和高容错的能力。Kafka Streams 使用了分区和任务的概念,基于 Kafka 主题分区的并行性模型。在并发环境里,Kafka  Streams 和 Kafka 之间有着紧密的联系:

  • 每个流分区是完全有序的数据记录队列,并映射到 Kafka 主题的分区。

  • 流的数据消息与主题的消息映射。

  • 数据记录中的 keys 决定了 Kafka 和 Kafka Streams  中数据的分区,即,如何将数据路由到指定的分区。

应用程序的处理器拓扑通过将其分成多个任务来进行扩展,更具体点说,Kafka Streams 根据输入流分区创建固定数量的任务,其中每个任务分配一个输入流的分区列表(即,Kafka 主题)。分区对任务的分配不会改变,因此每个任务是应用程序并行性的固定单位。然后,任务可以基于分配的分区实现自己的处理器拓扑;他们还可以为每个分配的分区维护一个缓冲,并从这些记录缓冲一次一个地处理消息。作为结果,流任务可以独立和并行的处理而无需手动干预。

重要的是要理解 Kafka Streams 不是资源管理器,而是可在任何地方都能“运行”的流处理应用程序库。多个实例的应用程序在同一台机器上执行,或分布多个机器上,并且任务可以通过该库自动的分发到这些运行的实例上。分区对任务的分配永远不会改变;如果一个应用程式实例失败,则这些被分配的任务将自动地在其他的实例重新创建,并从相同的流分区继续消费。

下面展示了2个分区,每个任务分配了输出流的1个分区。

图片

(图片来源: Kafka 官网)

4.1.2 线程模型

Kafka Streams 允许用户配置线程数,可用于平衡处理应用程序的实例。每个线程的处理器拓扑独立的执行一个或多个任务。例如,下面展示了一个流线程运行2个流任务。

图片

(图片来源: Kafka 官网)

启动更多的流线程或更多应用程序实例,只需复制拓扑逻辑(即复制代码到不同的机器上运行),达到并行处理处理不同的 Kafka 分区子集的目的。要注意的是,这些线程之间不共享状态。因此无需协调内部的线程。这使它非常简单在应用实例和线程之间并行拓扑。Kafka 主题分区的分配是通过 Kafka Streams 利用 Kafka 的协调功能在多个流线程之间透明处理。

如上所述,Kafka Streams 扩展流处理应用程序是很容易的:你只需要运行你的应用程序实例,Kafka Streams 负责在实例中运行的任务之间分配分区。你可以启动多个应用程序线程处理多个输入的 Kafka 主题分区。这样,所有运行中的应用实例,每个线程(即运行的任务)至少有一个输入分区可以处理。

4.1.3 故障容错

Kafka Streams 基于 Kafka 分区的高可用和副本故障容错能力。因此,当流数据持久到 Kafka,即使应用程序故障,如果需要重新处理它,它也是可用的。Kafka  Streams 中的任务利用 Kafka 消费者客户端提供的故障容错的能力来处理故障。如果任务故障,Kafka Streams 将自动的在剩余运行中的应用实例重新启动该任务。

此外,Kafka Streams 还确保了本地状态仓库对故障的稳定性。对于每个状态仓库都维持一个追踪所有的状态更新的变更日志主题。这些变更日志主题也分区,因此,每个本地状态存储实例,在任务访问仓里,都有自己的专用的变更日志分区。变更主题日志也启用了日志压缩,以便可以安全的清除旧数据,以防止主题无限制的增长。如果任务失败并在其他的机器上重新运行,则  Kafka Streams 在恢复新启动的任务进行处理之前,重放相应的变更日志主题,保障在故障之前将其关联的状态存储恢复。故障处理对于终端用户是完全透明的。

请注意,任务(重新)初始化的成本通常主要取决于通过重放状态仓库变更日志主题来恢复状态的时间。为了减少恢复时间,用户可以配置他们的应用程序增加本地状态的备用副本(即完全的复制状态)。当一个任务迁移发生时,Kafka Streams 尝试去分配任务给应用实例,提前配置了备用副本的应用实例就可以减少任务(重新)初始化的成本。

4.2 创建流

记录流(KStreams)或变更日志流(KTable或GlobalkTable)可以从一个或多个 Kafka 主题创建源流,(而 KTable 和 GlobalKTable,只能从单个主题创建源流)。

KStreamBuilder builder = new KStreamBuilder();KStream<String, GenericRecord> source1 = builder.stream("topic1", "topic2");
KTable<String, GenericRecord> source2 = builder.table("topic3", "stateStoreName");
GlobalKTable<String, GenericRecord> source2 = builder.globalTable("topic4", "globalStoreName");

4.3 流回写 Kafka

在处理结束后,开发者可以通过 KStream.to 和 KTable.to 将最终的结果流(连续不断的)写回 Kafka 主题。

joined.to("topic4");

如果已经通过上面的to方法写入到一个主题中,但是如果你还需要继续读取和处理这些消息,可以从输出主题构建一个新流,Kafka Streams 提供了便利的方法,through:

// equivalent to
//
// joined.to("topic4");
// materialized = builder.stream("topic4");
KStream materialized = joined.through("topic4");

4.4 流程序的配置与启执行

除了定义的 topology,开发者还需要在运行它之前在 StreamsConfig 配置他们的应用程序,Kafka Streams 配置的完整列表可以在这里找到。

Kafka Streams 中指定配置和生产者、消费者客户端类似,通常,你创建一个 java.util.Properties,设置必要的参数,并通过 Properties 实例构建一个 StreamsConfig 实例。

import java.util.Properties;
import org.apache.kafka.streams.StreamsConfig;
Properties settings = new Properties();
// Set a few key parameters
settings.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-first-streams-application");
settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092");
settings.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "zookeeper1:2181");// Any further settings
settings.put(... , ...);// Create an instance of StreamsConfig from the Properties instance
StreamsConfig config = new StreamsConfig(settings);

除了 Kafka Streams 自己配置参数,你也可以为 Kafka 内部的消费者和生产者指定参数。根据你应用的需要。类似于 Streams 设置,你可以通过 StreamsConfig 设置任何消费者和/或生产者配置。请注意,一些消费者和生产者配置参数使用相同的参数名。例如,用于配置 TCP 缓冲的 send.buffer.bytes 或 receive.buffer.bytes。用于控制客户端请求重试的 request.timeout.ms 和 retry.backoff.ms。如果需要为消费者和生产者设置不同的值,可以使用 consumer. 或 producer. 作为参数名称的前缀。

Properties settings = new Properties();// Example of a "normal" setting for Kafka Streams
settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker-01:9092");
// Customize the Kafka consumer settings
streamsSettings.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 60000);
// Customize a common client setting for both consumer and producer
settings.put(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG, 100L);
// Customize different values for consumer and producer
settings.put("consumer." + ConsumerConfig.RECEIVE_BUFFER_CONFIG, 1024 * 1024);
settings.put("producer." + ProducerConfig.RECEIVE_BUFFER_CONFIG, 64 * 1024);// Alternatively, you can use
settings.put(StreamsConfig.consumerPrefix(ConsumerConfig.RECEIVE_BUFFER_CONFIG), 1024 * 1024);
settings.put(StremasConfig.producerConfig(ProducerConfig.RECEIVE_BUFFER_CONFIG), 64 * 1024);

你可以在应用程序代码中的任何地方使用 Kafka Streams ,常见的是在应用程序的 main() 方法中使用。

首先,先创建一个 KafkaStreams 实例,其中构造函数的第一个参数用于定义一个 topology builder(Streams DSL的KStreamBuilder,或 Processor API 的 TopologyBuilder)。

第二个参数是上面提到的 StreamsConfig 的实例。

import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.processor.TopologyBuilder;
// Use the builders to define the actual processing topology, e.g. to specify
// from which input topics to read, which stream operations (filter, map, etc.)
// should be called, and so on.
KStreamBuilder builder = ...;  // when using the Kafka Streams DSL
//
// OR
//
TopologyBuilder builder = ...; // when using the Processor API
// Use the configuration to tell your application where the Kafka cluster is,
// which serializers/deserializers to use by default, to specify security settings,
// and so on.
StreamsConfig config = ...;
KafkaStreams streams = new KafkaStreams(builder, config);

在这点上,内部结果已经初始化,但是处理还没有开始。你必须通过调用 start() 方法启动 Kafka Streams 线程:

// Start the Kafka Streams instance
streams.start();

捕获任何意外的异常,设置 java.lang.Thread.UncaughtExceptionHandler。每当流线程由于意外终止时,将调用此处理程序。

streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {public uncaughtException(Thread t, throwable e) {// here you should examine the exception and perform an appropriate action!}
);

close() 方法结束程序。

// Stop the Kafka Streams instance
streams.close();

现在,运行你的应用程序,像其他的 Java 应用程序一样(Kafka Sterams 没有任何特殊的要求)。同样,你也可以打包成 jar,通过以下方式运行:

# Start the application in class com.example.MyStreamsApp
# from the fat jar named path-to-app-fatjar.jar.
$ java -cp path-to-app-fatjar.jar com.example.MyStreamsApp

当应用程序实例开始运行时,定义的处理器拓扑将被初始化成1个或多个流任务,可以由实例内的流线程并行的执行。如果处理器拓扑定义了状态仓库,则这些状态仓库在初始化流任务期间(重新)构建。这一点要理解,当如上所诉的启动你的应用程序时,实际上 Kafka Streams 认为你发布了一个实例。现实场景中,更常见的是你的应用程序有多个实例并行运行(如,其他的 JVM 中或别的机器上)。在这种情况下,Kafka Streams 会将任务从现有的实例中分配给刚刚启动的新实例。

五、监控数据 ETL 中 Kafka Streams 参数及其调优

5.1 必配参数:

  1. bootstrap.servers:这是 Kafka 集群的地址列表,Kafka Streams 使用它来初始化与 Kafka 的连接。

  2. key.deserializer 和 value.deserializer:这些配置定义了流中键和值的序列化和反序列化器。

  3. auto.offset.reset:当没有初始偏移量或偏移量无效时,这个配置定义了 Kafka Streams 如何处理。

  4. group.id:这对于使用 Kafka Streams 的消费者组来说很重要,它定义了消费者组的ID。

5.2 基础参数:

  1. num.stream.threads:定义 Kafka Streams 应用程序中的线程数,默认与处理器的逻辑核心数相等。

  2. state.dir:定义 Kafka Streams 存储状态的本地目录。

  3. threading.max.instances:定义每个主题分区的最大线程实例数,默认与分区数相等。

  4. threading.instances:定义每个主题分区的线程实例数,默认与分区数相等。

5.3 消费者参数:

  1. enable.auto.commit:自动提交偏移量,默认值为"true",建议设置为"false",以便更好地控制偏移量的提交。

  2. commit.interval.ms:提交偏移量的频率,默认值为5000ms,可以根据需要进行调整。

  3. max.poll.records:一次拉取的消息数量,默认值为1000,可以根据网络带宽和处理能力进行调整。

5.4 生产者参数:

  1. batch.size:批量发送消息的大小,默认值通常是16384(字节),可以根据网络带宽和 Kafka 集群的性能进行调整。

  2. linger.ms:消息在生产者缓冲区中的最小停留时间,默认值为100ms,可以根据需要进行调整。

  3. compression.type:压缩类型,可以提高网络带宽利用率,但会增加 CPU 开销。默认值为"none",可以根据需要设置为"gzip"、“snappy"或"lz4”。

对于 Kafka 的调优参数,可以根据实际的应用场景和性能需求进行调整,以达到最佳的性能和稳定性。

六、监控数据 ETL 中 Kafka Streams 的分区倾斜问题原因和解决方式

6.1 原因

分区倾斜是监控数据 ETL 的 Kafka Streams 在处理大规模数据流时遇到的常见问题。分区倾斜指的是在一个流处理应用程序中,某个分区的消息消费速度远远慢于其他分区,或某个分区的延迟积压数据远大于其他分区,导致  Kafka Streams 的实时性受到限制。

产生分区倾斜的原因可能包括:

  1. 数据分布不均匀:原始数据在 Kafka 主题的分区中分布不均匀,导致某些分区的消息量远大于其他分区。

  2. 消费者实例数量不足:在 Kafka Streams 应用程序中,消费者的实例数量不足,无法充分处理所有分区的消息。

  3. 消费者负载不均衡:消费者的负载不均衡(包括但不限于某些消费者实例处理的分区数大于其他实例),导致某些消费者实例处理的消息量远大于其他实例。

  4. 消费者实例负载不均衡:消费者实例性能不一致或性能被挤占,导致消费能力不均衡,消费速率异常小于平均消费速率

6.2 解决方案

  1. 数据均衡策略:在设计 Kafka 主题分区分配策略时,可以采用如轮询(Round-robin)或范围(Range)等均衡策略,使得数据在各个分区之间均匀分布。

  2. 增加消费者实例:根据应用程序的实际情况,适当增加消费者的实例数量,以提高整个系统的处理能力,例如扩容。

  3. 负载均衡策略:在消费者组内部实现负载均衡,如使用均匀分配消费者(Uniform Distribution Consumer)等策略,确保消费者实例之间的负载均衡,例如重启或剔除倾斜分区实例使 Kafka Streams 的分区进行重新分配。

  4. 优化消费者处理逻辑:分析消费者处理消息的速度慢的原因,优化处理逻辑,提高消费者的处理能力。

  5. 调整批次大小和窗口函数:通过调整 Kafka Streams 的批次大小和窗口函数等参数,降低消费者的处理压力。

  6. 使用侧输出:对于一些处理速度较慢的分区,可以考虑使用侧输出将部分消息引流至其他系统处理,减轻消费者负载。

七、总结

本文介绍了 Kafka Streams 在监控场景中的应用,阐述了 Kafka Streams 的基本概念,包括流、处理器拓扑、流处理器、时间概念等,举例说明了 Kafka Streams 在监控实时数据ETL中的具体应用,并详细解释了 Kafka Streams 的运作原理,包括其架构、创建流、流回写 Kafka、流程序配置与启执行等内容。文章还介绍了 Kafka Streams 的参数及其调优方法,以及可能出现的分区倾斜问题及其解决方法。

本文意在让读者对于 Kafka 流在监控业务的实际应用有所认识,并且了解 Kafka 流的基本概念和原理,阅读本文后对构建自己 Kafka 流应用程序有所帮助,能够理解在监控数据 ETL 常见分区倾斜的原理和解决方式。

引用:Kafka 官网 https://kafka.apache.org/

相关文章:

Kafka Streams 在监控场景的应用与实践

作者&#xff1a;来自 vivo 互联网服务器团队- Pang Haiyun 介绍 Kafka Streams 的原理架构&#xff0c;常见配置以及在监控场景的应用。 一、背景 在当今大数据时代&#xff0c;实时数据处理变得越来越重要&#xff0c;而监控数据的实时性和可靠性是监控能力建设最重要的一环…...

【计算机视觉基础CV】03-深度学习图像分类实战:鲜花数据集加载与预处理详解

本文将深入介绍鲜花分类数据集的加载与处理方式&#xff0c;同时详细解释代码的每一步骤并给出更丰富的实践建议和拓展思路。以实用为导向&#xff0c;为读者提供从数据组织、预处理、加载到可视化展示的完整过程&#xff0c;并为后续模型训练打下基础。 前言 在计算机视觉的深…...

Android实现RecyclerView边缘渐变效果

Android实现RecyclerView边缘渐变效果 1.前言&#xff1a; 是指在RecyclerView中实现淡入淡出效果的边缘效果。通过这种效果&#xff0c;可以使RecyclerView的边缘在滚动时逐渐淡出或淡入&#xff0c;以提升用户体验。 2.Recyclerview属性&#xff1a; 2.1、requiresFading…...

springboot结合AES和国密SM4进行接口加密

api接口加密 1.为什么需要api接口加密呢&#xff1f; 1.防止爬虫 2.防止数据被串改 3.确保数据安全 2.如何实现接口加密呢&#xff1f; 3.我们可以使用哪些加密算法来加密呢&#xff1f; AES 密码学中的高级加密标准&#xff08;Advanced Encryption Standard&#xff0c;…...

后端项目java中字符串、集合、日期时间常用方法

我这里只介绍了项目中最常用的哈,比如像集合有很多,但我们最常用的就是ArrayList。 然后我这里会以javascript中的字符串、数组的方法为基准来实现,有些方法js和java会有些区别也会介绍 字符串 每次修改 String 对象都会创建一个新的对象,而 StringBuffer 可以在同一个对象…...

前端框架Vue的路由机制

大家好&#xff0c;我是G探险者。 最近在调试前端代码的时候&#xff0c;遇到一个问题。首先我们有一个门户页面&#xff0c;该页面里面有很多的豆腐块&#xff0c;每个豆腐块会配置一个系统的跳转连接。 我的系统就是其中一个豆腐块&#xff0c;我第一次登录进来之后&#xf…...

flutter 快速实现侧边栏

首先我们写一个侧边栏工具类&#xff0c;示例如下&#xff1a; import package:flutter/material.dart;class Sidebar extends StatelessWidget {overrideWidget build(BuildContext context) {return Drawer(child: ListView(padding: EdgeInsets.zero,children: <Widget&…...

华为数通最新题库 H12-821 HCIP稳定过人中

以下是成绩单和考试人员 HCIP H12-831 HCIP H12-725 安全中级...

算法训练第二十三天|93. 复原 IP 地址 78. 子集 90. 子集 II

93. 复原 IP 地址--分割 题目 有效 IP 地址 正好由四个整数&#xff08;每个整数位于 0 到 255 之间组成&#xff0c;且不能含有前导 0&#xff09;&#xff0c;整数之间用 . 分隔。 例如&#xff1a;"0.1.2.201" 和 "192.168.1.1" 是 有效 IP 地址&…...

JS,递归,处理树形数据组件,模糊查询树形结构数据字段

JS递归如何模糊查询树形结构数据,根据数据中的某一个字段值&#xff0c;模糊匹配 直接拿去使用就行 function filterTreeLabel(arr, label) {let result []arr.forEach((item) > {// if (String(item.POBJECT_NAME).toLowerCase().indexOf(label)!-1) {if (String(item.P…...

前端大数字精度丢失?Choerodon UI 大数字解决方案:精确性与灵活性的结合!

01 引言 在企业项目开发中&#xff0c;数据的精确性是关键。Choerodon UI 的大数字解决方案&#xff0c;通过其高精度计算、数据一致性维护、灵活的数据交互、国际化支持、兼容性保障、定制化格式化等优势&#xff0c;为开发人员提供了一个强大的武器库&#xff0c;以确保在处…...

matlab凸包检测

% 创建一个3D点集 points [1 2 3; 4 5 6; 7 8 9; 10 11 12; 13 14 15]; % 使用convhull函数计算凸包 hull convhull(points); % 输出凸包点的索引 disp(Convex Hull Indices:); disp(hull); % 绘制点集和凸包 figure; scatter3(points(:,1), points(:,2), points(:,3),…...

单节点calico性能优化

在单节点上部署calicov3273后&#xff0c;发现资源占用 修改calico以下配置是资源消耗降低 1、因为是单节点&#xff0c;没有跨节点pod网段组网需要&#xff0c;禁用overlay方式网络(ipip&#xff0c;vxlan),使用route方式网络 配置calico-node的环境变量 CALICO_IPV4POOL_I…...

【芯片设计- RTL 数字逻辑设计入门 番外篇 7.1 -- 基于ATE的IC测试原理】

文章目录 ATE 测试概述Opens/Shorts测试Leakage测试AC测试转自:漫谈大千世界 漫谈大千世界 2024年10月23日 23:17 湖北 ATE 测试概述 ATE(Automatic Test Equipment)是用于检测集成电路(IC)功能完整性的自动测试设备。它在半导体产业中扮演着至关重要的角色,主要用于检…...

oracle 导入数据提示跳过表

imp system/orclorcl fileD:\oracle_back.dmp fully showy logD:\oracle_log.log 今天用上面的命令往 oracle 中导入数据出现一个奇怪的问题 就是所有导入的表都提示 正在跳过表XXX 最后提示成功终止导入, 没有出现警告。 最后select一个表也没导入进来 怪哉怪哉&#xff01;…...

鸿蒙开发(15)案例 排行榜

排行榜 准备图片 定义案例需要的数据模型 创建Models文件&#xff0c; //定义app需要的数据模型export class FruitData{name:string;vote:string;id:string;constructor(id:string,name:string,vote:string,) {this.id idthis.name namethis.vote vote}}排行榜头部 创…...

【Java Web】Axios实现前后端数据异步交互

目录 一、Promise概述 二、Promise基本用法 三、async和await关键字 四、Axios介绍 4.1 Axios基本用法 4.2 Axios简化用法之get和post方法 五、Axios拦截器 六、跨域问题处理 一、Promise概述 axios是代替原生的ajax实现前后端数据交互的一套新解决方案&#xff0c;而…...

SLAAC如何工作?

SLAAC如何工作&#xff1f; IPv6无状态地址自动配置(SLAAC)-常见问题 - 苍然满关中 - 博客园 https://support.huawei.com/enterprise/zh/doc/EDOC1100323788?sectionj00shttps://www.zhihu.com/question/6691553243/answer/57023796400 主机在启动或接口UP后&#xff0c;发…...

微信小程序UI自动化测试实践 !

微信小程序UI自动化测试实践 引言&#xff1a; 随着微信小程序的快速发展&#xff0c;越来越多的企业和开发者开始开发小程序来满足用户的需求。而在开发小程序的过程中&#xff0c;UI自动化测试是一个必不可少的环节&#xff0c;可以帮助开发者减少人工测试的工作量&#xff…...

代码随想录-笔记-其七

我们来到了贪心算法的章节。 贪心算法和其他部分不太一样的是&#xff0c;他更多的是突出一种思路&#xff1a;通过求局部最优解来求全局最优解。因为只是一个大的思想逻辑&#xff0c;针对不同题型总是有不同的解决方案&#xff0c;所以贪心算法也不想其他算法那样有一个很经…...

react身份证回显

1. 处理身份证号的函数 function getAgeSexAndBirthdate(idCard: string): { sex: 男 | 女 | null; birthdate: Date | null } {if (idCard.length ! 18) {console.error(身份证号码必须是18位。);return { sex: null, birthdate: null };}// 提取出生年月日const year parse…...

Hibernate、JPA、Spring DATA JPA、Hibernate 代理和架构

大家好&#xff0c;今天&#xff0c;我们将讨论 Hibernate 和 JPA 架构。 在开始我们的文章之前&#xff0c;我想回答一个重要的问题&#xff1a;为什么我们需要使用 Hibernate、Eclipse Link、EF core 等 ORM 工具&#xff1f; 事实上&#xff0c;这是一个非常好的问题。我们…...

leetcode----mysql

1179. 重新格式化部门表 - 力扣&#xff08;LeetCode&#xff09; 表 Department&#xff1a; ------------------------ | Column Name | Type | ------------------------ | id | int | | revenue | int | | month | varchar | ----…...

盛元广通畜牧与水产品检验技术研究所LIMS系统

一、系统概述 盛元广通畜牧与水产品检验技术研究所LIMS系统集成了检测流程管理、样品管理、仪器设备管理、质量控制、数据记录与分析、合规性管理等功能于一体&#xff0c;能够帮助实验室实现全流程的数字化管理。在水产、畜牧产品的质检实验室中&#xff0c;LIMS系统通过引入…...

EXCEL文件解析

[Excel文件名].xlsx (解压后) │ ├── _rels │ └── .rels (定义关系文件) ├── docProps │ ├── app.xml (应用程序属性) │ └── core.xml (核心文档属性) ├── xl │ ├── _rels │ │ └── workbook.xml.rels (工作簿关系文件) │ ├── …...

【C++】- 掌握STL List类:带你探索双向链表的魅力

文章目录 前言&#xff1a;一.list的介绍及使用1. list的介绍2. list的使用2.1 list的构造2.2 list iterator的使用2.3 list capacity2.4 list element access2.5 list modifiers2.6 list的迭代器失效 二.list的模拟实现1. list的节点2. list的成员变量3.list迭代器相关问题3.1…...

开源 AI 智能名片 S2B2C 商城小程序中运营与产品的关系剖析

摘要&#xff1a;本文聚焦于开源 AI 智能名片 S2B2C 商城小程序&#xff0c;深入探讨其中运营与产品之间的关系。通过分析运营与产品的多种关系认知&#xff0c;阐述在该特定小程序情境下运营与产品相互依存、相互作用的机制&#xff0c;包括运营对产品的需求以及产品对运营的依…...

flask_socketio 以继承 Namespace方式实现一个网页聊天应用

点击进入上一篇&#xff0c;可作为参考 实验环境 python 用的是3.11.11 其他环境可以通过这种方式一键安装&#xff1a; pip install flask3.1.0 Flask-SocketIO5.4.1 gevent-websocket0.10.1 -i https://mirrors.tuna.tsinghua.edu.cn/pypi/web/simple pip list 详情如下&am…...

DePIN潜力项目Spheron解读:激活闲置硬件,赋能Web3与AI

DePIN赛道作为今年加密资本关注的热点之一&#xff0c;不仅吸引了大量资金涌入&#xff0c;还凭借其灵活的资源调配、高效的运作方式和可靠的安全性能&#xff0c;逐渐渗透到多个领域和项目中。例如&#xff0c;Helium的无线网络协议、IoTeX的去中心化物联网、IO NET的去中心化…...

《Vue进阶教程》第十六课:深入完善响应式系统之单例模式

往期内容&#xff1a; 《Vue进阶教程》第五课&#xff1a;ref()函数详解(重点) 《Vue进阶教程》第六课&#xff1a;computed()函数详解(上) 《Vue进阶教程》第七课&#xff1a;computed()函数详解(下) 《Vue进阶教程》第八课&#xff1a;watch()函数的基本使用 《Vue进阶教…...

C++ —— const修饰指针

C —— const修饰指针 常量指针&#xff08;实际开发中用的很多&#xff09;指针常量&#xff08;了解即可&#xff09;常指针常量&#xff08;了解即可&#xff09; 常量指针&#xff08;实际开发中用的很多&#xff09; 语法&#xff1a;const 数据类型 *变量名; 不能通过解…...

【学习笔记】数据结构(八)

动态存储管理 文章目录 动态存储管理8.1 概述8.2 可利用空间表及分配方法8.3 边界标识法8.3.1 可利用空间表的结构8.3.2 分配算法8.3.3 回收算法 8.4 伙伴系统8.4.1 可利用空间表的结构8.4.2 分配算法8.4.3 回收算法 8.5 无用单元收集 - 垃圾回收机制8.6 存储紧缩 - 内存碎片化…...

maven-resources-production:ratel-fast: java.lang.IndexOutOfBoundsException

Maven生产环境中遇到java.lang.IndexOutOfBoundsException的问题&#xff0c;尝试了重启电脑、重启IDEA等常规方法无效&#xff0c;最终通过直接重建工程解决了问题。 Rebuild Project 再启动OK...

建投数据与腾讯云数据库TDSQL完成产品兼容性互认证

近日&#xff0c;经与腾讯云联合测试&#xff0c;建投数据自主研发的人力资源信息管理系统V3.0、招聘管理系统V3.0、绩效管理系统V2.0、培训管理系统V3.0通过腾讯云数据库TDSQL的技术认证&#xff0c;符合腾讯企业标准的要求&#xff0c;产品兼容性良好&#xff0c;性能卓越。 …...

后端-添加购物车和查看购物车

...

【HarmonyOS NEXT】Web 组件的基础用法以及 H5 侧与原生侧的双向数据通讯

关键词&#xff1a;鸿蒙、ArkTs、Web组件、通讯、数据 官方文档Web组件用法介绍&#xff1a;文档中心 Web 组件加载沙箱中页面可参考我的另一篇文章&#xff1a;【HarmonyOS NEXT】 如何将rawfile中文件复制到沙箱中_鸿蒙rawfile 复制到沙箱-CSDN博客 目录 如何在鸿蒙应用中加…...

7-2 排序

输入一批未排序的数据&#xff0c;数量不超过30个&#xff0c;请使用选择法或者冒泡法对其排序&#xff0c;并按照规定的要求输出。 输入格式: 先输入待排序的整形数的个数&#xff1b;然后输入所有的待排序的数据。 输出格式: 在一行中按照由大到小的顺序输出排序好的数据…...

Java通过反射破坏单例模式

有个第三方工具类&#xff0c;不支持多例模式。但是又不能直接改第三方工具类的代码&#xff0c;因此可以通过反射破坏第三方工具类的单例。 第三方工具类反编译如下 可以看到构造函数进行了私有化&#xff0c;不允许外部new&#xff0c;只能通过newInstance进行实例化。并且…...

FFmpeg第一话:FFmpeg 简介与环境搭建

FFmpeg 探索之旅 一、FFmpeg 简介与环境搭建 二、FFmpeg 解码详解 第一话&#xff1a;FFmpeg 简介与环境搭建 FFmpeg 探索之旅一、前言二、FFmpeg 是什么&#xff1f;三、简单介绍其历史背景四、为什么用 C学习 FFmpeg&#xff1f;&#xff08;一&#xff09;高性能优势&#…...

C++并发编程: std::atomic对指针进行操作

std::atomic 对指针进行运算的用途 std::atomic 提供了一种在多线程环境中安全地操作指针的方法。这对于实现线程安全的指针管理、动态内存分配、链表操作等场景非常有用。通过使用std::atomic对指针进行运算&#xff0c;可以确保指针操作的原子性和多线程安全性。 常见用途 …...

工业大数据分析算法实战-day08

文章目录 day08模型评价聚类算法基于距离的聚类基于层次的聚类基于密度的聚类基于分布的聚类聚类结果的评价 day08 今天是第8天&#xff0c;昨日阐述了概率图模型和集成学习的分类&#xff0c;主要讲解了有向图和无向图&#xff0c;生成式模型和判断式模型&#xff0c;以及集成…...

在 C# 中实现的目录基础操作

前言 在开发应用程序过程中&#xff0c;对操作系统上的文件夹存储文件和子文件夹操作是常见的需求。.NET中的Directory类提供了处理文件目录的功能。本文介绍如何读取文件夹的属性、获取文件夹的大小及文件个数、创建文件夹、遍历文件夹中的所有文件、移动文件夹和删除文件夹等…...

​Python 标识符是啥?​

Python 的标识符就是我们写代码时用来给变量、函数、类等取名字的东西。 你写的 my_variable 是个标识符&#xff0c; 定义的 add_numbers 函数名也是个标识符&#xff0c; 甚至你写的 Cat 类名&#xff0c;也是标识符。 一句话总结&#xff1a;标识符就是代码里给“东西”起…...

WEB开发: 全栈工程师起步 - Python Flask +SQLite的管理系统实现

一、前言 罗马不是一天建成的。 每个全栈工程师都是从HELLO WORLD 起步的。 之前我们分别用NODE.JS 、ASP.NET Core 这两个框架实现过基于WebServer的全栈工程师入门教程。 今天我们用更简单的来实现&#xff1a; Python。 我们将用Python来实现一个学生管理应用&#xff0…...

将 Matplotlib 图形转换为 PIL 图像并返回

将 Matplotlib 图形转换为 PIL 图像并返回 前言完整代码Matplotlib 中 fig 和 ax 的关系示例&#xff1a; 问题分析常见错误及解决方案总结 前言 Matplotlib 是 Python 里最流行的图表展示库&#xff0c;PIL (Python Imaging Library)则是一个强大的图像处理库。在开发过程中&…...

F5中获取客户端ip地址(client ip)

当F5设备对其原始设置上的所有IP地址使用NAT时&#xff0c;连接到poo成员&#xff08;nodes、backend servers&#xff09;的出站连接将是NAT IP地址。 pool 成员&#xff08;nodes、backend servers&#xff09;将无法看到真实的客户端 ip地址&#xff0c;因为看到的是F5上的…...

点焊机器人维修-ABB-KUKA-FANUC-YASKAWA

在正式启用点焊机器人之前&#xff0c;一项至关重要的预备步骤便是进行焊枪的全面设置操作。以FANUC机器人为例&#xff0c;其焊枪的设置流程涵盖了多个关键环节&#xff0c;如焊枪运动方向的精确规划、焊枪规格的选择以及零点标定的细致执行等。这些设置均须严格依据实际所采用…...

springboot448教学辅助系统(论文+源码)_kaic

摘 要 互联网发展至今&#xff0c;无论是其理论还是技术都已经成熟&#xff0c;而且它广泛参与在社会中的方方面面。它让信息都可以通过网络传播&#xff0c;搭配信息管理工具可以很好地为人们提供服务。针对信息管理混乱&#xff0c;出错率高&#xff0c;信息安全性差&#x…...

网络地址转换(NAT)和端口映射

1. 网络地址转换(NAT) 1.1 NAT的应用场景 &#xff08;1&#xff09;应用场景&#xff1a;允许将私有IP地址映射到公网地址&#xff0c;以减缓IP地址空间的消耗 ①需要连接Internet&#xff0c;但主机没有公网IP地址 ②更换了一个新的ISP&#xff0c;需要重新组织网络时&…...

iClent3D for Cesium 实现无人机巡检飞行效果

作者&#xff1a;gaogy 1、背景 随着地理信息技术的发展&#xff0c;三维地球技术逐渐成为了许多领域中的核心工具&#xff0c;尤其是在城市规划、环境监测、航空航天以及军事领域。三维地图和场景的应用正在帮助人们更加直观地理解空间数据&#xff0c;提供更高效的决策支持。…...