Kafka 入门与实战
一、Kafka 基础
1.1 创建topic
kafka-topics.bat --bootstrap-server localhost:9092 --topic test --create
1.2 查看消费者偏移量位置
kafka-consumer-groups.bat --bootstrap-server localhost:9092 --describe --group test
1.3 消息的生产与发送
#生产者
kafka-console-producer.bat --bootstrap-server localhost:9092 --topic test#消费者
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test
二、Kafka 集群部署
在 Kafka 集群中,每个节点是一个 broker,broker 中可以创建多个 Topic,每个Topic都可以被分成多个Partition,每个 Partition 都是一个有序的、不可变的消息序列。Partition 是Kafka集群中消息存储的最小单元,也是Kafka集群中消息分发和负载均衡的最小单位,Partition 通过副本机制 Replication 来保证数据的高可用性和容错性,每个 Partition 都有一个 leader 的副本和 多个 follower 副本,leader 副本负责接收和处理消息,follower 副本负责复制leader 副本的数据。
2.1 修改server.properties
#设置 broker 不唯一
broker.id=1
#若是在一台机器上,需要更改端口号,避免冲突
listeners=PLAINTEXT://:9091
#日志目录,选择性更改
log.dirs=D:/kafka-cluster/data/kafka-broker-1
2.2 创建启动脚本文件
#zookeeper 启动脚本
@echo off
cd /d %~dp0
.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
pause#每个Kafka服务器创建启动脚本
@echo off
cd /d %~dp0
.\bin\windows\kafka-server-start.bat .\config\server.properties
pause
2.3 创建批处理启动脚本
@echo off
cd /d %~dp0
cd ./kafka-broker-1
start zookeeper.bat
ping 127.0.0.1 -n 10 >nul
start kafka.bat
cd ../kafka-broker-2
start kafka.bat
cd ../kafka-broker-3
start kafka.bat
pause
2.4 创建批处理清除脚本
再次重新启动会启动失败,需要清除文件夹
@echo off
cd /d %~dp0
rd /s /q D:\kcluster\data
pause
Windows 下使用反斜杠,若使用正斜杠,可能会报错。
三、zookeeper 服务启动
3.1 kafka 启动,zookeeper节点变化
controller:集群模式下会有多个 broker 节点,而这些broker节点会选举出一个管理者-controller。
zookeeper 中数据存储的方式类似文件:
当有kafka连接 zk 时,就会创建节点,存储相关信息,此时的 controller 是临时节点,当我们的 kafka 服务器关闭时,该节点就会被删除(默认创建临时节点):
3.2 集群模式
当我们以集群的方式去启动时,controller 的选举策略是 “先入为主”,即哪个 broker 先注册至 zk 中,和 zk 建立连接并创建 controller 节点,它就会被选举为 controller。此时其他 broker 节点在尝试创建 controller 节点时就会失败,但仍会监听 controller 节点,如果 controller 节点挂掉了,此时其他 broker 节点就会去竞争创建 controller 节点,先创建的就会被选举为 controller 节点,而其他的 broker 节点会创建失败,继续去监听新的 controller。
3.3 controller 与 broker 通信
监听 /brokers/ids 节点是为了监听是否有新的 broker 节点,并管理这些 broker 节点。
第二个broker 启动流程
四、主题创建
主题创建时需要指定 name,partition 和 replication,partition 是数据负载均衡的最小单位,partition 数一般小于等于 broker 节点数,replication 是partition的副本节点,replication 一般会和partition 存储到不同的 broker 节点。
五、生产数据
5.1 生产流程
先将信息封装成 ProducerRecord ,然后再经过拦截器得到一个经过拦截处理后的ProducerRecord,之后再通过序列化器针对 key 和 value 进行序列化,通过分区器计算数据发送至哪个partition,即发送到对应的 broker 节点,然后加入到缓冲区,直到缓存刷新或者缓存区满后通过 Sender 发送线程将数据发送至 broker。
分区器会通过 MetadataCache 来获取 topic 的相关信息,并针对 partition 来计算对应的分区。(如果设置分区编号,则会直接使用,不会对编号进行校验,如果没有对应的分区,数据则无法正确存储到对应的 topic 中)
5.2 数据的异步发送和同步发送
数据默认是异步发送的, 将数据发送至数据缓冲区中,之后会由 sender 线程来发送。同步发送:
@RequestMapping("/send")public String send(String msg) throws ExecutionException, InterruptedException {CompletableFuture future=kafkaTemplate.send(TOPIC,msg);future.get();return "success";}
5.3 ACKS 应答处理机制
- 0:sender 线程发送消息后,直接返回 ack,此时数据只是放到了网络当中,不会考虑数据是否真正存到 Kafka 中。
- 1:当数据保存到磁盘后,即保存到对应的分区后会直接返回 ack。
- -1(all):当多个副本 replication 都同步消息完成后,才会返回 ack,该级别是最安全的。
5.4 数据重复及乱序的原因
要了解原因首先知道重传机制: 当数据没有正常发送,没有接收到对应的 ack 时,此时就会重新发送,直到发送成功或者超过最大重试次数。
当我们的leader 节点保存数据到磁盘后,在返回ack的时候,由于网络问题,导致连接超时或者ack 丢失,就会导致我们的数据重新发送,此时就会导致数据重复。
由于数据没有正常发送,此时数据就会重新回到缓冲区,sender 线程再重新拉取并发送到对应的 topic 中,而在重新发送成功之前,此时其他消息已经保存到了 topic 中,这时就导致数据乱序。
5.5 幂等性操作
幂等性可以解决上述数据乱序和重复的问题,但是幂等性开启后有以下几点要求:
- acks= -1
- 需要开启重试机制
- 在图请求缓冲区不能大于5
如何解决的?
在broker维护了一个保存生产者生产数据的分区状态 ProducerState ,之里面会维护最近五条消息,在新发送消息后,会去验证消息是否相同,若重复则不会继续添加,若没有重复,则会判断顺序号是否是连续的,如果顺序号不是连续的,则会将数据重新返回发送缓冲区,再重新发送,这也是要求为什么要求开启重试机制。
值得注意的是: 幂等性只能保证一个分区的数据不重复和顺序连续,无法保证多个分区的连续。由于我们的幂等性由生产者id + 数据顺序号来决定的,当我们的 broker 重启,生产者 id就会改变,此时相同的数据由于不同的生产者id 仍然会造成上述问题,也就是说无法实现跨会话幂等。
5.6 事务操作及流程
为了解决跨会话幂等性,可以通过事务来解决。
当我们开启事务后,可以保证broker 节点多次重启,保证生产者 id 不变,这就解决了我们上述的幂等性出现的问题。但是事务仍然只能保证单个分区的幂等性,即开启事务可以保证跨会话的幂等性,但无法保证跨分区的幂等性。引文在 bachMetadata 中保存的只有一个分区的最近五条消息,无法跨会话进行判断数据的重复和顺序。
流程:producer 首先会发送请求事务管理器的所在分区节点,Broker 根据事务 id 的 hash 值并对事务管理器状态分区个数(50)取余,返回对应的事务管理器所在分区。producer 初始化生产者 id,并将数据的分区信息发送给事务管理器。之后 prducer 开始生产数据,并将数据发送对应分区的 leader 节点,当数据保存后,对应的 broker 节点会将数据保存成功后的数据分区信息发送给事务协调器,并向生产者返回 acks 应答响应。producer 接收到应答响应后向事务协调器发送关闭事务,事务协调器接收到请求后,首先会将 __trancsaction_state 中的事务状态修改为 PrepareCommit(准备提交),然后再将事务当前的状态返回给 broker 节点,最后事务协调器会将 __transaction_state 中保存的事务状态改为 CompleteCommit。
六、代码片段
6.1 消费数据偏移量
@RequestMapping("/send")public String send(String msg) throws ExecutionException, InterruptedException {CompletableFuture future=kafkaTemplate.send(TOPIC,msg);future.get();return "success";}//监听所订阅的主题@KafkaListener(topics = {Producer.TOPIC})public void onMessage(String data,Acknowledgment ack){System.out.println("receive: " + data);ack.acknowledge();}
kafka-consumer-groups.bat --bootstrap-server 127.0.0.1:9092 --group test --topic test --reset-offsets --to-earliest --execute
手动更改偏移量可使消费过的数据从头消费
spring:kafka:consumer:producer:#如果之前用相同的消费者组消费过该主题,并且 offset已经记录在 kafka 中,那么从kafka中读取的offset就是该offset,kafka 只会在找不到偏移量时会使用这个配置,如果想要从头消费,可以使用心得消费者group-id,或者手动提交偏移量.auto-offset-reset: earliest# 是否自动提交偏移量enable-auto-commit: false
- earliest:自动将偏移量重置为最早的偏移量
- latest:自动将偏移量重置为最新的偏移量
- none:如果没有为消费者找到以前的偏移量,则向消费者抛异常
6.2 send() 常用方法
@RequestMapping("/send2")public String sendMessage(){Message<String> message= MessageBuilder.withPayload("hello kafka").setHeader(KafkaHeaders.TOPIC,"test").build();kafkaTemplate.send(TOPIC,"hello kafka");return "success";}@RequestMapping("/send3")public String sendRecord(){//Headers 中可以存放一些信息(信息是key-valur 键值对),当消费者接收到消息后,可以拿到这个 headers 里面存放的信息.Headers headers=new RecordHeaders();headers.add("phone","1234567".getBytes(StandardCharsets.UTF_8));headers.add("name","zhangsna".getBytes(StandardCharsets.UTF_8));ProducerRecord record=new ProducerRecord<>("test",0,System.currentTimeMillis(),"k1","hello kafka",headers);kafkaTemplate.send(record);return "success";}//消费者获取 header 信息@KafkaListener(topics = {Producer.TOPIC})public void onMessage(String data, Acknowledgment ack,@Header(value = KafkaHeaders.RECEIVED_TOPIC, required = false) String topic,@Header(value = "name", required = false)String name,@Header(value = "phone", required = false)String phone){User user= JsonUtils.fromJson(data, User.class);System.out.println("receive: " + data.toString()+" topic: "+topic+" name: "+name+" phone: "+phone);ack.acknowledge();}//同步提交@RequestMapping("/send4")public String sendSync(){for(int i=0;i<10;i++) {System.out.println("发送消息: "+i);CompletableFuture future=kafkaTemplate.send("test",i+"");try {SendResult<String,String> result= (SendResult<String, String>) future.get();if(result.getRecordMetadata()!=null){System.out.println("消息发送成功: "+result.getRecordMetadata().toString());}System.out.println(result.getRecordMetadata());} catch (InterruptedException e) {throw new RuntimeException(e);} catch (ExecutionException e) {throw new RuntimeException(e);}System.out.println("成功发送消息: "+i);}return "success";}//异步提交@RequestMapping("/send5")public String sendAsync(){for(int i=0;i<10;i++){System.out.println("发送消息: "+i);CompletableFuture<SendResult<String,String> >future=kafkaTemplate.send("test",i+"");future.thenAccept((sendResult)->{if(sendResult.getRecordMetadata()!=null){System.out.println("消息发送成功: "+sendResult.getRecordMetadata().toString());}System.out.println(sendResult.getProducerRecord());}).exceptionally((t)->{t.printStackTrace();//做失败处理return null;});}return "success";}
6.3 发送引用类型信息
将应用类型转换成 json 对象
package org.aliang.kafkademo.utils;import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;@Component
public class JsonUtils {public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();public static String toJson(Object object) {try {return OBJECT_MAPPER.writeValueAsString(object);} catch (Exception e) {e.printStackTrace();}return null;}public static <T> T fromJson(String json, Class<T> clazz) {try {return OBJECT_MAPPER.readValue(json, clazz);} catch (Exception e) {e.printStackTrace();}return null;}}
生产者消费者代码
@Autowiredprivate KafkaTemplate<String,User> kafkaTemplate2;@RequestMapping("/send6")public void sendObject(User user){String msg= JsonUtils.toJson(user);kafkaTemplate.send("test",msg);}@KafkaListener(topics = {Producer.TOPIC})public void onMessage(String data,Acknowledgment ack){User user= JsonUtils.fromJson(data, User.class);System.out.println("receive: " + data.toString());ack.acknowledge();}
6.4 继承 springboot 创建 topic
@Configuration
public class CreateTopic {@Beanpublic NewTopic newTopic() {return new NewTopic("test", 1, (short) 1);}@Beanpublic NewTopic updateTopic() {return new NewTopic("test", 2, (short) 1);}
}
更新 分区时只能增加分区数量,无法减少数量。
副本分区的数量不能大于 broker 节点个数。
6.5 发送消息配置分区策略
@Value("${kafka.cluster.bootstrap-servers}")private String bootstrapServers;@Value("${spring.kafka.producer.key-serializer}")private String keySerializer;@Value("${spring.kafka.producer.value-serializer}")private String valueSerializer;public Map<String, Object> producerConfig(){Map<String,Object> props =new HashMap<>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServers);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,keySerializer);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,valueSerializer);props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, RoundRobinPartitioner.class);return props;}public ProducerFactory<String,Object> producerFactory(){return new DefaultKafkaProducerFactory<>(producerConfig());}@Beanpublic KafkaTemplate<String,Object> kafkaTemplate(){return new KafkaTemplate<>(producerFactory());}
但是当我们运行后,发现并不是每个分区都有数据,不符合我们的轮询算法,这是因为在发送消息的过程中,会调用两次我们的 partition 方法,最终就导致不符合我们的预期。
6.6 配置自定义分区策略
public class CustomPartitioner implements Partitioner {private AtomicInteger nextPartition = new AtomicInteger(0);@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);int numPartitions = partitions.size();if(key==null){int next=nextPartition.getAndIncrement();if(next>=numPartitions){nextPartition.set(next,0);}
// System.out.println("分区值"+next);return next;}else {//kafka 默认的分区策略return Utils.toPositive(Utils.murmur2(keyBytes))%numPartitions;}}}public Map<String, Object> producerConfig(){Map<String,Object> props =new HashMap<>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServers);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,keySerializer);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,valueSerializer);//配置自定义分区策略props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class);return props;}
6.7 自定义消息拦截器
public class CustomerInterceptor implements ProducerInterceptor{/*** 发送消息是会调用该方法,可以在拦截器中做一些处理,记录日志操作* @param producerRecord* @return*/@Overridepublic ProducerRecord onSend(ProducerRecord producerRecord) {System.out.println("拦截器拦截到消息:"+producerRecord.value());return producerRecord;}/*** @param recordMetadata 服务器返回的元数据信息* @param e*/@Overridepublic void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {if(recordMetadata!=null){System.out.println("消息发送成功,偏移量: "+recordMetadata.offset());}else{System.out.println("消息发送失败");}}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> map) {}
}public Map<String, Object> producerConfig(){Map<String,Object> props =new HashMap<>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServers);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,keySerializer);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,valueSerializer);
// props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, RoundRobinPartitioner.class);props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class);props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,CustomerInterceptor.class.getName());return props;}
6.8 消费者同时接受消息体和消息头
@KafkaListener(topics = {Producer.TOPIC})public void onMessage(Acknowledgment ack,ConsumerRecord<String,String> consumerRecord ){System.out.println("接收到consumerRecord消息: "+consumerRecord.toString());ack.acknowledge();}
6.9 指定 topic-partition-offset 消费
@KafkaListener(topicPartitions = @TopicPartition(//监听 test 主题topic = "test",//消费 0,1,2 分区的所有消息partitions = {"0","1","2"},partitionOffsets = {//第三、四个分区的偏移量从 2 开始消费@PartitionOffset(partition = "3",initialOffset = "2"),@PartitionOffset(partition = "4",initialOffset = "2")}))public void onMessage(ConsumerRecord record, Acknowledgment ack){System.out.println("receive: " + record.value()+"partition: "+record.partition()+" offset: "+record.offset());
// ack.acknowledge();}
若配置了监听的分区,但该主题下还有其他分区没配置,例如没有配置 5 分区,则不会消费 partition5 分区的消息。
6.10 批量消费信息
//批量消费数据@KafkaListener(topics = "test")public void onMessage(List<ConsumerRecord<String,String>> recordList, Acknowledgment ack){System.out.println("receive: " + recordList.size());ack.acknowledge();}
#设置批量消费
spring.kafka.type=batch
#批量消费每次消费多少条消息
spring.kafka.consummer.max-poll-records
6.11 集成消费拦截器
1. 实现kafka 的 ConsumerInterceptor 拦截器接口
@Configuration
public class CustomConsumerInterceptor implements ConsumerInterceptor<String,String> {//在监听器消费消息之前执行@Overridepublic ConsumerRecords<String,String> onConsume(ConsumerRecords consumerRecords) {System.out.println("拦截器拦截到消息:"+consumerRecords);return consumerRecords;}//在消息提交偏移量之前执行@Overridepublic void onCommit(Map<TopicPartition, OffsetAndMetadata> var) {System.out.println("拦截器执行 onCommit,提交offset");}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> map) {}
}
2. 在 kafka 消费者的 ConsumerFactory 配置中注册拦截器
@Configuration
public class KafkaConfig {@Value("${kafka.cluster.bootstrap-servers}")private String bootstrapServers;@Value("${spring.kafka.consumer.key-deserializer}")private String keyDeserializer;@Value("${spring.kafka.consumer.value-deserializer}")private String valueDeserializer;public Map<String, Object> consumerConfig(){Map<String,Object> props =new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServers);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,keyDeserializer);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,valueDeserializer);props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, CustomConsumerInterceptor.class.getName());props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");return props;}@Beanpublic ConsumerFactory<String,Object> consumerFactory(){return new DefaultKafkaConsumerFactory<>(consumerConfig());}@Beanpublic KafkaListenerContainerFactory<?> customListenerContainerFactory(ConsumerFactory<String,Object> consumerFactory){ConcurrentKafkaListenerContainerFactory<String,String> factory=new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);return factory;}
}
3. 消费者代码
@KafkaListener(topics = "test",containerFactory = "customListenerContainerFactory", groupId = "test")public void onMessage(String data, Acknowledgment ack){System.out.println("receive: " + data);ack.acknowledge();}
总结: 通过配置新的监听器工厂,并配置监听器工厂中的消费者厂,消费者中配置自定义拦截器
上述代码虽然实现了自定义消息拦截器,但在运行过程中,发现我们的消费者和监听器的配置都没有生效,这是怎么回事呢?
public static void main(String[] args) {// SpringApplication.run(KafkaDemoApplication.class, args);ApplicationContext context = SpringApplication.run(KafkaDemoApplication.class, args);Map<String, ConsumerFactory> beansOfType = context.getBeansOfType(ConsumerFactory.class);beansOfType.forEach((k,v)->{System.out.println(k+"-->"+v);});System.out.println("--------------------------------------------------------------");Map<String , KafkaListenerContainerFactory> beansOfType1 = context.getBeansOfType(KafkaListenerContainerFactory.class);beansOfType1.forEach((k,v)->{System.out.println(k+"-->"+v);});}
我们进行调试,从打印信息中可以看到,kafka 监听器有两个bean工厂,有一个是我们自定义的的,一个是默认的,默认的 listener工厂中的消费者配置都是我们的配置文件中的,而我们自定义的 listener 工厂中的消费者配置是我们在创建监听器时,传入的 consumerFactory 中的 ConsumerConfig 中的配置信息,由于我们在监听器制定了自定义 listener 工厂,因此我们配置文件中的配置才会失效,如果想要配置生效,就需要把想要配置的选项重新在 ConsumerConfig 中配置。
6.12 消息的转发
@KafkaListener(topics = "testA", groupId = "test")
// 要转发的 topic@SendTo(value = "testB")public String onMessageA(String data, Acknowledgment ack){System.out.println("TestA receive消息: " + data);ack.acknowledge();return data;}@KafkaListener(topics = "testB", groupId = "test2")public void onMessageB(String data, Acknowledgment ack){System.out.println("TestB receive转发后的消息: " + data);ack.acknowledge();}
注: 在使用@SendTo 注解后,同时配置了新的分区策略和拦截器后,不知道为何原因,因为重新注入了新的 KafkaTemplate,在项目启动后,会找不到对应的 bean,而不使用@SendTo 注解却可以正常加载。就算定义加载顺序(注入的bean的名字也为更改),仍然找不到对应的 bean。原因位置(埋点)
6.12 配置消费者分区策略
- RangeAssignor(默认策略):按范围进行分配,如果由8个分区,3个消费者,C1 消费0、1、2;C2消费 3、4、5;C3 消费 6、7
- RoundRobinAssignor:轮询,如果由8个分区,3个消费者,C1 消费0、3、6;C2消费 1、4、7;C3 消费 2、5
- StickAssignor:尽量保持现有分区不变,当有新的消费者加入或离开后,只更改少量消费者所消费分区,大部分保持不变,仍然保持现有消费分区
- CooperativeStickAssignor:优化后的粘性分区策略
代码:
配置消费策略
@Configuration
public class KafkaConfig {@Value("${kafka.cluster.bootstrap-servers}")private String bootstrapServers;@Value("${spring.kafka.consumer.key-deserializer}")private String keyDeserializer;@Value("${spring.kafka.consumer.value-deserializer}")private String valueDeserializer;public Map<String, Object> consumerConfig(){Map<String,Object> props =new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServers);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,keyDeserializer);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,valueDeserializer);
// props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, CustomConsumerInterceptor.class.getName());props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RoundRobinAssignor.class.getName());props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");props.put(ConsumerConfig.GROUP_ID_CONFIG,"test");return props;}@Beanpublic ConsumerFactory<String,Object> consumerFactory(){return new DefaultKafkaConsumerFactory<>(consumerConfig());}@Beanpublic KafkaListenerContainerFactory<?> customListenerContainerFactory(ConsumerFactory<String,Object> consumerFactory){ConcurrentKafkaListenerContainerFactory<String,String> factory=new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);return factory;}
}
配置消费者
//批量消费数据@KafkaListener(topics = "test",groupId = "testC",containerFactory = "customListenerContainerFactory",concurrency = "3")public void onMessage(ConsumerRecord record, Acknowledgment ack){System.out.println("receive: " + record.partition() +"分区"+record.value()+"消费者: "+Thread.currentThread().getId());ack.acknowledge();}
七、_consmuer_offsets
在每次消费一个消息并且提交后,会保存当前消费的最近的一个 offset;
在 zookeeper 中,有一个 _consumer_offsets主题,消费者提交的 offset 信息会写入到该topic 中,_consumer_offsets 保存了每个 consumer group 某一时刻提交的 offset 信息,_consumer_offsets 默认有 50 个分区,集群模式 zk 会给每个 broker 节点分配分区
consumer_group 保存到哪个分区中的计算公式:
Math.abs("groupid".hashCode())%groupMetadataTopicPartitionCount
八、ISR、HW、LEO
8.1 ISR:
在同步中的副本(In-Sync-Replicas),包含了 Leader 副本和所有与 Leader 副本保持同步的 Follower 副本
写请求首先由 Leader 副本处理,之后 Follwer 副本会从 Leader 上拉取写入的消息,这个过程会有一定延迟,导致 Follower 副本中保存的消息略少于 Leader 副本,但是只要没有超出阈值都可以容忍,但是一旦一个 Follower 副本出现异常,那这是 Leader 就会把他踢出去,Kafka 通过 ISR 集合来维护一个 “ 可用且消息量与 Leader 差不多的副本集合,他是整个副本的一个子集”
在 kafka 中,一个副本要成为 ISR 副本,要满足以下条件:
- Leader 副本本身就是一个 ISR 副本
- Follower 副本的最后一条消息的 offset 与 leader 副本的最后一条消息的 offset 之间的差值不能超过指定的阈值,超过阈值则该 Follower 副本将会从 ISR 列表中删除
- replica.lag.time.max.ms:默认是 30 秒;如果该 follower 在此时间间隔内一直没有追上过 Leader 副本就会被 ISR 集合剔除
- replica.lag.max.messages:落后了多少条消息时,该 Follower 副本就会被剔除 ISR 列表,该配置参数现在在新版本已经过时了。
8.2 LEO
日志末端偏移量(Log End Offset),该消息日志中下一条消息的偏移量
8.3 HW
HW(High Watermark),即高水位,它表示一个偏移量 offset 信息,表示消息的复制进度,也就是消息已经成功复制到哪个位置了。即在 HW 之前的所有消息都已经成功写入副本中,并且可以在所有的副本中找到,因此,消费者可以安全的消费这些消息。而对于消费者而言,它只能拉取 HW之前的消息,对于这之后未同步完成的消息,是不可见的。
相关文章:
Kafka 入门与实战
一、Kafka 基础 1.1 创建topic kafka-topics.bat --bootstrap-server localhost:9092 --topic test --create 1.2 查看消费者偏移量位置 kafka-consumer-groups.bat --bootstrap-server localhost:9092 --describe --group test 1.3 消息的生产与发送 #生产者 kafka-cons…...
VM虚拟机安装群晖系统
下载群晖系统 https://download.csdn.net/download/hmxm6/90351935 安装群晖连接软件 synology-assistant-6.2-24922(在上面的压缩包里面) 准备好VM虚拟机 创建群晖虚拟机 打开下载下来的虚拟机 添加硬盘 选择类型 创建新的磁盘 指定容量 指定存储文件 完成硬盘添加…...
关于ESP-IDF 5.4 中添加第三方组件esp32-camera找不到文件,编译错误解决办法(花了一天时间解决)
最近需要使用ESP32-S3-CAM 的OV2640摄像头采集图像,为了加速开发进度,于是选择了esp32-camera组件,该组件不是官方组件,需要自己git clone。但在为项目添加esp32-camera组件时,一直编译错误,找不到头文件&a…...
【C++】C++11
目录 C11简介 统一的列表初始化 {}初始化 std::initializer_list 声明 auto decltype nullptr 范围for循环 智能指针 STL中的一些变化 右值引用和移动语义 左值引用和右值引用 右值引用的意义 完美转发 lambda表达式 新的类功能 可变参数模版 包装器 func…...
Intellij IDEA如何查看当前文件的类
快捷键:CtrlF12,我个人感觉记快捷键很麻烦,知道具体的位置更简单,如果忘了快捷键(KeyMap)看一下就记起来了,不需要再Google or Baidu or GPT啥的,位置:Navigate > Fi…...
CF 278A.Circle Line
题目分析 输入n个数据作为路径,求从a到b的最短距离,需要将其相成一个圆圈,既可以从小往大走又可以从大往小走 思路分析 依然将数据存为数组,通过下标进行操作,既然说了有两种方式那就计算两种方式哪个更快就输出谁 代…...
Naive UI去掉n-select下拉框边框,去掉n-input输入框边框
1、第一种通过js去掉 <template><div><div style"margin-top:10px;width: 100%;"><dade-descriptions><tr><dade-descriptions-item label"代理名称"><dade-input placeholder"代理名称"></dade-…...
(文末提供数据集下载)ML.NET库学习001:基于PCA的信用卡异常检查之样本处理与训练
文章目录 (文末提供数据集下载)ML.NET库学习001:基于PCA的信用卡异常检查之样本处理与训练目标项目概述代码结构概述1. **主要类和文件**2. **命名空间和使用指令**3. **数据类 (TransactionObservation)**4. **主程序入口 (Main 方法)**5. **数据预处理 (DataPrepr…...
疯狂SQL转换系列- SQL for Milvs2.4
鉴于Milvus仍在不停的迭代新版本,推出新功能,其SDK目前并不稳定。目前其2.4版本的SDK接口已与之前的2.2版本有了较大的差别,功能上也有了一定的调整。为此,我们重新提供了针对[Milvus2.4](https://github.com/colorknight/moql-tr…...
C++的 I/O 流
本文把复杂的基类和派生类的作用和关系捋出来,具体的接口请参考相关文档 C的 I/O 流相关的类,继承关系如下图所示 https://zh.cppreference.com/w/cpp/io I / O 的概念:内存和外设进行数据交互称为 I / O ,例如:把数…...
基于ansible部署elk集群
ansible部署 ELK部署 ELK常见架构 (1)ElasticsearchLogstashKibana:这种架构是最常见的一种,也是最简单的一种架构,这种架构通过Logstash收集日志,运用Elasticsearch分析日志,最后通过Kibana中…...
4.Python字符串和列表:字符串输入、字符串输出、下标和切片、字符串常见函数、列表(list)、列表的循环遍历、列表的增删改查、列表的嵌套、列表的切片
1. Python 字符串 1.1 字符串输入 input() 函数用于从用户获取字符串输入。它总是返回一个字符串类型的值。 # 从用户输入字符串 name input("请输入你的名字:") print(f"你好, {name}")1.2 字符串输出 字符串的输出通常使用 print() 函数…...
51单片机之使用Keil uVision5创建工程以及使用stc-isp进行程序烧录步骤
一、Keil uVision5创建工程步骤 1.点击项目,新建 2.新建目录 3.选择目标机器,直接搜索at89c52选择,然后点击OK 4.是否添加起吊文件,一般选择否 5.再新建的项目工程中添加文件 6.选择C文件 7.在C文件中右键,添加…...
Redis - 全局ID生成器 RedisIdWorker
文章目录 Redis - 全局ID生成器 RedisIdWorker一、引言二、实现原理三、代码实现代码说明 四、使用示例示例说明 五、总结 Redis - 全局ID生成器 RedisIdWorker 一、引言 在分布式系统中,生成全局唯一ID是一个常见的需求。传统的自增ID生成方式在分布式环境下容易出…...
Linux ftrace 内核跟踪入门
文章目录 ftrace介绍开启ftraceftrace使用ftrace跟踪指定内核函数ftrace跟踪指定pid ftrace原理ftrace与stracetrace-cmd 工具KernelShark参考 ftrace介绍 Ftrace is an internal tracer designed to help out developers and designers of systems to find what is going on i…...
Visual Studio(VS)没有显示垂直滚轮or垂直滚轮异常显示
前言: 前段时间,我换上了新电脑。满心欢喜地安装好 VS,准备大干一场时,却发现了一个小麻烦 —— 垂直滚轮显示异常(如图 1)。这种显示方式实在让我难以适应,每一次操作都觉得别扭。 于是&#…...
大数据数仓实战项目(离线数仓+实时数仓)3
1.课程内容和课程目标 2.订单时间维度指标需求分析 根据时间数据,生成一个时间维度表,我们后面还可以去复用这个时间维度表 3.使用kettle生成日期维度数据 Hive创建日期维度表 使用Kettle构建以下组件结构图 使用kettle生成日期维度数据插入到我们的hi…...
通过acme生成与续签ssl证书,并部署到nginx
通过acme生成与续签ssl证书,并部署到nginx 介绍 官方介绍: acme.sh 实现了 acme 协议,可以从 ZeroSSL,Lets Encrypt 等 CA 生成免费的证书。 安装 acme.sh 1. curl方式 curl https://get.acme.sh | sh -s emailmyexample.com…...
c语言对应汇编写法(以中微单片机举例)
芯片手册资料 1. 赋值语句 C语言: a 5; b a; 汇编: ; 立即数赋值 LDIA 05H ; ACC 5 LD R01,A ; R01 ACC(a5); 寄存器间赋值 LD A,R01 ; ACC R01(读取a的值) LD R02,A ; R02 ACC&…...
React基础内容(面试一)
React大厂常见的面试题目涉及多个方面,包括React的基本概念、组件、状态管理、生命周期、性能优化等。以下是对这些面试题目的详细解析: 一、React基本概念 解释React是什么以及它的主要特点 React是一个用于构建用户界面的JavaScript库,由F…...
2025年软件测试五大趋势:AI、API安全、云测试等前沿实践
随着软件开发的不断进步,测试方法也在演变。企业需要紧跟新兴趋势,以提升软件质量、提高测试效率,并确保安全性,在竞争激烈的技术环境中保持领先地位。本文将深入探讨2025年最值得关注的五大软件测试趋势。 Parasoft下载https://…...
常用工具类——Collections集合框架
常用工具类——Collections集合框架 Collections 是 JDK 提供的一个工具类,提供了一系列静态方法,分类来复习! 1.排序操作 reverse(List list) :反转顺序shuffle(List list) : 洗牌,将顺序打乱sort(List list) &…...
【大数据技术】搭建完全分布式高可用大数据集群(ZooKeeper)
搭建完全分布式高可用大数据集群(ZooKeeper) apache-zookeeper-3.8.4-bin.tar.gz注:请在阅读本篇文章前,将以上资源下载下来。 写在前面 本文主要介绍搭建完全分布式高可用集群 ZooKeeper 的详细步骤。 注意: 统一约定将软件安装包存放于虚拟机的/software目录下,软件…...
Docker Desktop安装kubernetes时一直在Starting:Kubernetes failed to start
原因:由于墙的问题,导致拉取国外的K8s镜像失败 解决: 下载 k8s-for-docker-desktop 选中自己的kubernetes 版本 下载zip包 PowerShell运行load_images.ps1文件 重启docker kubernetes运行成功...
物流中的物联网:其含义、应用和优势
随着世界联系日益紧密,物流格局正经历重大变革。科技已成为供应链管理的支柱,推动物流公司迈入效率与连通性兼具的新时代。 物联网(IoT)是一股变革性力量,重塑着物流与运输行业的架构。物联网在物流领域并非昙花一现的…...
Axure设计教程:动态排名图(中继器实现)
一、开篇 在Axure原型设计中,动态图表是展示数据和交互效果的重要元素。今天,我们将学习如何使用中继器来创建一个动态的排名图,该图表不仅支持自动轮播,还可以手动切换,极大地增强了用户交互体验。此教程旨在提供一个…...
【人工智能】掌握图像风格迁移:使用Python实现艺术风格的自动化迁移
《Python OpenCV从菜鸟到高手》带你进入图像处理与计算机视觉的大门! 解锁Python编程的无限可能:《奇妙的Python》带你漫游代码世界 图像风格迁移(Image Style Transfer)是一种基于深度学习的计算机视觉技术,通过将一张图像的内容与另一张图像的艺术风格结合,生成一幅具…...
# C指针地址CUP寄存器访问IO内存映射
C指针地址&CUP寄存器访问&IO内存映射 在裸机编程中,C语言可以像汇编语言一样直接操作芯片寄存器地址进行读取和写入,主要是由于以下几个原因: 1. 裸机环境下没有操作系统的干预 裸机编程是指直接在硬件上运行程序,没有…...
UE5 蓝图学习计划 - Day 14:搭建基础游戏场景
在上一节中,我们 确定了游戏类型,并完成了 项目搭建、角色蓝图的基础设置(移动)。今天,我们将进一步完善 游戏场景,搭建 地形、墙壁、机关、触发器 等基础元素,并添加角色跳跃功能,为…...
浅尝yolo11全程记录1-准备环境+官网模型推理(个人备份)
准备工作(虚拟环境、导入项目) 安装Anaconda 主要是为了创建和管理虚拟环境,在pycharm里按照项目里的requirments.txt安装依赖的时候,使用虚拟环境会好很多(我记得不用Anaconda也可以直接在pycharm的terminal里头创建…...
用 Python 给 Excel 表格截图(20250207)
我搜索了网络上的方案,感觉把 Excel 表格转换为 HTML 再用 platwright 截图是比较顺畅的路径,因为有顺畅的工具链。如果使用的是 Windows 系统则不需要阅读此文,因为 win32com 库更方便。这篇文章中 Excel 转 HTML 的方案,主要弥补…...
Office/WPS接入DS等多个AI工具,开启办公新模式!
在现代职场中,Office办公套件已成为工作和学习的必备工具,其功能强大但复杂,熟练掌握需要系统的学习。为了简化操作,使每个人都能轻松使用各种功能,市场上涌现出各类办公插件。这些插件不仅提升了用户体验,…...
智能化转型2.0:从“工具应用”到“价值重构”
过去几年,“智能化”从一个模糊的概念逐渐成为企业发展的核心议题。2024年,随着生成式AI、大模型、智能体等技术的爆发式落地,中国企业正式迈入智能化转型的2.0时代。这一阶段的核心特征是从单一场景的“工具应用”转向全链条的“价值重构”&…...
深度整理总结MySQL——索引工作原理
B树索引数据结构 前言什么样的索引数据结构是好的搜索速度要求支持范围查找寻求适合查找的算法寻求合适的数据结构二叉查找树自平衡二叉树B树B树数据结构B与B树比较 总结 前言 相信你在面试时,通常会被问到“什么是索引?”而你一定要能脱口而出…...
基于asr的所见即可说方案
年前写的文章对所见即可说方案进一步调研-CSDN博客,针对rk3568定制版,进行了Accessibility实现所见即可说功能的验证与调研,结论是不可行。 最终解决方案是:结合科大讯飞的AI大模型智能助手,使用rk3588板(…...
【截图】selenium自动通过浏览器截取指定元素div的图片
【截图】selenium自动通过浏览器截取指定元素div的图片 思路 截取完整网页截图 通过元素的坐标 截图到指定位置的图片 前提是已经获取到 driver 了 # 定位目标divtarget_div driver.find_element(By.CLASS_NAME, headlines-right)# 获取div的位置和大小location target_div…...
CSS入门学习笔记(一)
学习视频:https://www.bilibili.com/video/BV1zN2UYoEEo/ 目录 基本介绍语法引入方式内联样式(行内样式)内部样式外部样式 选择器四种选择器全局选择器元素选择器类选择器id选择器 合并选择器选择器的优先级 字体属性colorfont-sizefont-weig…...
docker安装es及分词器ik
系统是macos,docker是docker-desktop 拉取镜像 docker pull bitnami/elasticsearch 启动docker镜像 docker create -e "discovery.typesingle-node" \ --name elasticsearch1 -p 9200:9200 -p 9300:9300 \ bitnami/elasticsearch:8.17.1 测试是否好…...
11. 9 构建生产级聊天对话记忆系统:从架构设计到性能优化的全链路指南
构建生产级聊天对话记忆系统:从架构设计到性能优化的全链路指南 关键词: 聊天对话记忆系统、多用户会话管理、LangChain生产部署、Redis记忆存储、高并发对话系统 一、服务级聊天记忆系统核心需求 多用户隔离:支持同时处理数千个独立对话持久化存储:对话历史不因服务重启丢…...
SpringBoot启动源码剖析:从入口到容器的诞生
文章目录 SpringBoot启动的核心入口SpringApplication的初始化SpringBoot的启动流程1. 准备环境(Environment)2. 创建应用上下文(ApplicationContext)3. 刷新应用上下文(Refresh Context)4. 调用Runner接口…...
Day38【AI思考】-彻底打通线性数据结构间的血脉联系
文章目录 **彻底打通线性数据结构间的血脉联系****数据结构家族谱系图****一、线性表(老祖宗的规矩)****核心特征** **二、嫡系血脉解析**1. **数组(规矩森严的长子)**2. **链表(灵活变通的次子)** **三、庶…...
虚拟鼠标MATVT:遥控器操控的安卓电视增强工具
虚拟鼠标MATVT:遥控器操控的安卓电视增强工具 matvt Virtual Mouse for Android TV that can be controlled via remote itself. 项目地址: https://gitcode.com/gh_mirrors/ma/matvt 项目基础介绍与编程语言 虚拟鼠标MATVT(matvt)是…...
优惠券平台(一):基于责任链模式创建优惠券模板
前景概要 系统的主要实现是优惠券的相关业务,所以对于用户管理的实现我们简单用拦截器在触发接口前创建一个单一用户。 // 用户属于非核心功能,这里先通过模拟的形式代替。后续如果需要后管展示,会重构该代码 UserInfoDTO userInfoDTO new…...
从零开始:OpenCV 图像处理快速入门教程
文章大纲 第1章 OpenCV 概述 1.1 OpenCV的模块与功能 1.2 OpenCV的发展 1.3 OpenCV的应用 第2章 基本数据类型 2.1 cv::Vec类 2.2 cv::Point类 2.3 cv::Rng类 2.4 cv::Size类 2.5 cv:&…...
计算机网络-SSH基本原理
最近年底都在忙,然后这两天好点抽空更新一下。前面基本把常见的VPN都学习了一遍,后面的内容应该又继续深入一点。 一、SSH简介 SSH(Secure Shell,安全外壳协议)是一种用于在不安全网络上进行安全远程登录和实现其他安…...
数据库性能优化(sql优化)_统计信息_yxy
数据库性能优化_统计信息理解 1 什么是数据库统计信息?2 统计信息不准确3 统计信息分类3.1 表统计信息3.2 列统计信息3.3 索引统计信息4 统计方式4.1 频率直方图4.2 等高直方图5 总结1 什么是数据库统计信息? 数据库中同一个sql有非常多种执行方式,每种执行方式的代价肯定不…...
QT通过setProperty设置不同QSS样式
如上切换效果就是通过setProperty来实现切换不同颜色的。 实现以上效果第一步,需要在QSS中做属性处理。 QLabel{color:red;} QLabel[status"1"]{color:black;} QLabel[status"2"]{color:white;} QLabel[status"3"]{color:blue;} QLa…...
基础入门-算法解密散列对称非对称字典碰撞前后端逆向MD5AESDESRSA
知识点: 0、算法类型-单向散列&对称性&非对称性 1、算法识别加解密-MD5&AES&DES&RSA 2、解密条件寻找-逻辑特征&源码中&JS分析 应用场景: 1、发送数据的时候自动将数据加密发送(只需加密即可) 安全…...
VsCode创建VUE项目
1. 首先安装Node.js和npm 通过网盘分享的文件:vsCode和Node(本人电脑Win11安装) 链接: https://pan.baidu.com/s/151gBWTFZh9qIDS9XWMJVUA 提取码: 1234 它们是运行和构建Vue.js应用程序所必需的。 1.1 Node安装,点击下一步即可 …...
【DeepSeek】DeepSeek小模型蒸馏与本地部署深度解析DeepSeek小模型蒸馏与本地部署深度解析
一、引言与背景 在人工智能领域,大型语言模型(LLM)如DeepSeek以其卓越的自然语言理解和生成能力,推动了众多应用场景的发展。然而,大型模型的高昂计算和存储成本,以及潜在的数据隐私风险,限制了…...