kafka生产者专题(原理+拦截器+序列化+分区+数据可靠+数据去重+事务)
目录
- 生产者
- 发送数据原理
- 参数说明
- 代码示例(同步发送数据)
- 代码示例(异步)
- 异步和同步的区别
- 同步发送
- 定义与流程
- 特点
- 异步发送
- 定义与流程
- 特点
- 异步回调
- 描述
- 代码示例
- 拦截器
- 描述
- 代码示例
- 消息序列化
- 描述
- 代码示例(自定义序列化)
- 分区
- 描述
- 分区策略
- 代码示例
- 写入默认分区(0号分区)
- 自定义分区机制
- 消息丢失
- 消息绝对不丢失的条件
- 数据去重
- 描述
- 幂等性
- 事务
- 代码示例(事务)
生产者
发送数据原理
说明
- 拦截器允许有多个,可以组成拦截器链
- 生产者发送的消息会被分配到不同的分区(Partition)。每个分区在内存中都有一个对应的缓冲区(RecordAccumulator),用于暂存即将发送的消息。sender线程从中读取数据
- sender线程两个重要参数(大小默认16KB,当数据量达到16KB时读取,读取时间默认0ms,当达到读取时间时,自动读取数据,不管大小有没有达到),这两个参数均可以调整
- NetworkClient负责将生产者的请求(如发送消息、获取元数据等)发送到相应的broker,并存储这些请求的响应;NetworkClient还负责处理网络连接的建立、维护和关闭。
- 当生产者发送消息到broker时,可以选择不同的应答级别(acks参数):
acks=0:生产者不等待broker的应答,直接认为消息发送成功。这种方式性能最高,但可靠性最低。
acks=1:生产者等待leader broker的应答,只要leader broker确认收到消息,就认为消息发送成功。这种方式性能较高,但可靠性略低。
acks=all(或acks=-1):leader和follower都落地回应,才认为消息发送成功。这种方式性能最低,但可靠性最高。
Broker在收到消息后,会根据配置的应答机制向生产者发送应答(或错误)信息。 - 请注意关于broker的落地是指数据存储到磁盘或持久化
- 数据发送成功后(接收到应答)删除缓冲区内对应的数据
参数说明
参数 | 默认值 | 作用描述 |
---|---|---|
bootstrap.servers | node2:9092[,node3:9092][,node4:9092] | 生产者连接集群所需的broker地址清单,一个或多个(逗号隔开) |
key.serializer | (无) | 指定发送消息的key的序列化类型,必须写全类名 |
value.serializer | (无) | 指定发送消息的value的序列化类型,必须写全类名 |
buffer.memory | 32M | RecordAccumulator缓冲区总大小 |
batch.size | 16K | 缓冲区一批数据最大值,适当增加可提高吞吐量,但可能增加延迟 |
linger.ms | 0ms (表示没有延迟) | 如果数据未达到batch.size,sender等待linger.time后发送数据 |
acks | -1 | 应答机制:0-不需要应答,1-Leader应答,-1(all)-所有节点应答 |
max.in.flight.requests.per.connection | 5 | 允许最多没有返回ack的次数,开启幂等性时建议1-5之间 |
enable.idempotence | true | 是否开启幂等性,默认开启 |
retries | 2147483647 (int最大值) | 消息发送错误时的重试次数 |
retry.backoff.ms | 100ms | 两次重试之间的时间间隔 |
compression.type | none | 生产者发送的所有数据的压缩方式,默认不压缩 |
代码示例(同步发送数据)
package com.wunaiieq;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;public class SyncCustomProducer {public static void main(String[] args) throws ExecutionException, InterruptedException {//TODO 1.声明并实例化Kafka Producer的配置文件对象Properties prop = new Properties();//TODO 2.为配置文件对象设置参数//TODO 2.1 配置bootstrap_server(生产者连接集群所需的broker地址清单)prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.16.100:9092,192.168.16.101:9092,192.168.16.102:9092");//TODO 2.2 配置key和value的序列化类// 设置序列化器:指定key和value的序列化类为StringSerializer,用于将字符串类型的key和value转换为字节数组,以便发送到Kafka。prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());//TODO 3.声明并实例化生产者对象KafkaProducer<String,String> producer =new KafkaProducer<String, String>(prop);//TODO 4.发送消息// producer.send(...).get():同步发送消息,send()方法返回一个Future对象,调用get()方法等待发送完成并获取结果。for(int i = 0;i<5;i++){//同步发送消息producer.send(new ProducerRecord<>("topicA","sync_msg"+i)).get();}//TODO 5.关闭生产者producer.close();}
}
代码说明
类/对象 | 描述 | 用途 |
---|---|---|
Properties | Java标准库中的类,用于维护键值对列表。 Properties类提供了一种方便的方式来读取和写入属性文件(通常是.properties文件) | 在本代码中用于存储Kafka生产者的配置参数。 |
KafkaProducer<K, V> | Kafka客户端库中的类,用于向Kafka主题发送消息。泛型参数K 和V 分别表示消息键和值的类型。 | 创建生产者实例,发送消息到Kafka主题。 |
ProducerConfig | Kafka客户端库中的类,包含生产者配置的常量。 | 提供配置参数的常量值,如broker地址、序列化器等。 |
ProducerRecord<K, V> | Kafka客户端库中的类,表示要发送到Kafka主题的消息记录。泛型参数K 和V 分别表示消息键和值的类型。 | 创建消息记录对象,包含主题、键和值。 |
StringSerializer | Kafka客户端库中的类,实现了Serializer<String> 接口,用于将字符串类型的键或值序列化为字节数组。 | 作为键和值的序列化器,将字符串转换为字节数组进行传输。 |
效果
代码示例(异步)
package com.wunaiieq;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;public class UnSyncCustomProducer {public static void main(String[] args) {//实例化PropertiesProperties prop = new Properties();//集群节点prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.16.100:9092,192.168.16.101:9092,192.168.16.102:9092");//key和valueprop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());//创建kafka生产者对象,并写入响应参数KafkaProducer<String, String> producer = new KafkaProducer<String, String>(prop);//发送数据for (int i = 0; i < 5; i++) {//异步发送数据,不调用get方法producer.send(new ProducerRecord<>("topicA", "unsync_msg" + i));}producer.close();}
}
异步和同步的区别
同步发送
定义与流程
定义: 同步发送是指生产者在发送一条消息后,会立即等待Kafka服务器的响应。只有在服务器返回成功响应后,生产者才会继续发送下一条消息。
流程:
生产者调用send()方法发送消息。
send()方法返回一个Future对象。
生产者调用Future对象的get()方法,该方法会阻塞当前线程,直到Kafka服务器返回响应或抛出异常。
生产者收到响应后,根据结果(成功或失败)进行后续操作。
特点
高可靠性:同步发送确保每条消息都被Kafka集群接收并持久化。生产者等待Kafka确认消息已经成功写入指定分区且复制到满足副本因子的节点上,从而提高了消息的可靠性。
异常处理:如果发送过程中发生异常,生产者可以立即感知并处理,避免了消息的丢失。
性能较低:由于同步发送需要阻塞等待响应,因此会增加消息的延迟,降低系统的吞吐量。特别是在高并发场景下,可能会导致线程资源的大量占用和性能瓶颈。
易调试:便于发现和处理异常,有利于开发和测试阶段的调试工作。
异步发送
定义与流程
定义: 异步发送是指生产者在发送一条消息后,不会立即等待Kafka服务器的响应,而是继续发送下一条消息。发送方通过传递一个回调函数给send()方法,该回调函数将在消息发送结果(成功或失败)可用时被异步调用。
流程:
生产者调用send()方法发送消息,并传递一个回调函数。
Kafka客户端将消息放入内部缓冲区,并立即返回。
Sender线程负责将缓冲区中的消息批量发送到Kafka集群。
当消息发送成功或失败时,Kafka客户端调用之前传递的回调函数,通知生产者消息发送的结果。
特点
高性能:异步发送方式下,生产者无需等待每个消息的确认即可继续发送下一条消息,从而提高了消息的发送效率,适用于高吞吐量场景。
灵活性:通过回调函数,生产者可以对消息发送的结果进行异步处理,如记录日志、重试发送等。
可靠性相对较低:由于生产者不会立即得知消息是否成功写入Kafka,因此消息的可靠性需要额外关注。如果生产者在发送消息后立即崩溃,可能会导致部分消息丢失。
调试复杂:由于消息发送和结果是异步的,因此调试时可能需要更多的日志记录和监控手段来确保消息的可靠性和完整性。
异步回调
描述
回调函数会在 producer 收到 ack 时调用,为异步调用,该方法有两个参数,分别是元数据信息(RecordMetadata)和异常信息(Exception),如果 Exception 为 null,说明消息发送成功,如果 Exception 不为 null,说明消息发送失败。消息发送失败后会自动重试,不需要再回调函数中手动重试。
代码示例
package com.wunaiieq;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class UnSyncCallBackCustomProducer {public static void main(String[] args) throws ExecutionException, InterruptedException {//TODO 1.声明并实例化Kafka Producer的配置文件对象Properties prop = new Properties();//TODO 2.为配置文件对象设置参数// 2.1 配置bootstrap_serversprop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.16.100:9092,192.168.16.101:9092,192.168.16.102:9092");// 2.2 配置key和value的序列化类prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());//TODO 3.声明并实例化生产者对象KafkaProducer<String,String> producer = new KafkaProducer<String, String>(prop);//TODO 4.发送消息for(int i = 0;i<5;i++){//异步发送消息 不调用get()方法producer.send(new ProducerRecord<>("topicA", "unsync_msg" + i),new Callback() {//如下方法在生产者收到acks确认时异步调用@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if(e == null){//无异常信息,笑死发送成功,输出主题和分区信息到控制台System.out.println("topic:"+recordMetadata.topic()+",partition:"+recordMetadata.partition());}else{//打印异常信息System.out.println(e.getMessage());}}});Thread.sleep(5);}//TODO 5.关闭生产者producer.close();}
}
拦截器
描述
拦截器(Interceptor)是kafka0.10.0.0版本中引入的新功能,主要用于实现clients端的定制化控制逻辑。它可以使得用户在消息发送前以及producer回调逻辑前有机会对消息做一些定制化需求,比如修改消息等。同时允许指定多个Interceptor按序作用于同一条消息从而形成一个拦截器链(Interceptor Chain)。
自定义拦截器需要实现org.apache.kafka.clients.producer.ProducerInterceptor接口。
拦截器内部方法
-
onSend
方法- 作用:在消息发送之前进行拦截,允许对消息进行修改或处理。
- 参数:接收一个
ProducerRecord
对象。 - 返回值:返回一个
ProducerRecord
对象,可能是修改后的记录。 - 应用场景:添加消息头、修改消息内容、过滤消息等。
-
onAcknowledgement
方法- 作用:在消息发送成功或失败后进行回调。
- 参数:
RecordMetadata
:包含消息的元数据。Exception
:发送过程中可能抛出的异常,成功时为null
。
- 返回值:无。
- 应用场景:记录发送结果、统计发送成功率、处理发送失败等。
-
close
方法- 作用:在拦截器不再使用时进行资源清理。
- 参数:无。
- 返回值:无。
- 应用场景:关闭打开的文件、释放内存、断开网络连接等。
拦截器Interceptor可能运行在多个线程中,因此在具体实现时用户需要自行确保线程安全。另外,若指定了多个Interceptor,则producer将按照指定顺序调用它们,同时把每个Interceptor中捕获的异常记录写入到错误日志中而不是向上传递。这在使用过程中要特别留意。
代码示例
实现一个简单的双interceptor组成的拦截链。
第一个Interceptor会在消息发送前将时间戳信息加到消息value的前面
第二个Interceptor在消息发送后更新成功发送消息数或失败发送消息数。
第一个拦截器示例
package com.wunaiieq;import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;import java.util.Map;public class TimeStampInterceptor implements ProducerInterceptor<String,String> {/**初始化拦截器,并接收Kafka生产者的配置参数。* */@Overridepublic void configure(Map<String, ?> configs) {}/**发送之前被调用,对消息进行处理。* */@Overridepublic ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {return new ProducerRecord<String, String>(//原始消息记录的主题、分区、时间戳和键。record.topic(),record.partition(),record.timestamp(), record.key(),//将当前系统时间戳(System.currentTimeMillis())和原始消息值拼接成新的消息值,中间用逗号分隔。System.currentTimeMillis()+","+record.value());}/**消息发送成功或失败后被调用。* */@Overridepublic void onAcknowledgement(RecordMetadata metadata, Exception exception) {}/**在拦截器不再使用时进行资源清理。* */@Overridepublic void close() {}
}
第二个拦截器示例
package com.wunaiieq;import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;import java.util.Map;public class CounterIntercepter implements ProducerInterceptor<String,String> {private int errorCounter = 0;private int successCounter = 0;/**onSend方法,该方法在消息发送之前被调用,用于对消息进行处理。* 由于这是第二个拦截器,因此这里接受的是前一个拦截器的输出* */@Overridepublic ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {return record;}/**在消息发送成功或失败后被调用。* 统计消息发送成功或失败的数量* */@Overridepublic void onAcknowledgement(RecordMetadata metadata, Exception exception) {if(exception==null){successCounter++;}else{errorCounter++;}}/**拦截器关闭时会进行的额外操作* 打印成功或失败的消息数量* */@Overridepublic void close() {System.out.println("successful send:"+successCounter);System.out.println("failed send:"+errorCounter);}@Overridepublic void configure(Map<String, ?> configs) {}
}
拦截器调用
package com.wunaiieq;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutionException;public class SyncCustomProducerInterceptor {public static void main(String[] args) throws ExecutionException, InterruptedException {Properties prop = new Properties();prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.16.100:9092,192.168.16.101:9092,192.168.16.102:9092");prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 构造拦截器链List<String> interceptors = new ArrayList<>();interceptors.add("com.wunaiieq.TimeStampInterceptor");interceptors.add("com.wunaiieq.CounterIntercepter");//配置拦截器链(将拦截器链加入到配置文件中)prop.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);KafkaProducer<String, String> producer =new KafkaProducer<String, String>(prop);for (int i = 5; i < 10; i++) {//同步发送消息producer.send(new ProducerRecord<>("topicA", "sync_msg" + i)).get();}//一定要关闭生产者,这样才会调用interceptor的close方法producer.close();}
}
效果
消息序列化
描述
消息序列化是将对象转换为字节流的过程。在Kafka中,生产者需要将消息对象序列化为字节流,以便通过网络发送给Kafka集群;而消费者则需要将从Kafka集群接收到的字节流反序列化为对象,以便进行后续处理。
代码示例(自定义序列化)
pom.xml
增加依赖
<dependency><groupId>org.codehaus.jackson</groupId><artifactId>jackson-mapper-asl</artifactId><version>1.9.13</version></dependency>
UserVo.java
值对象
package com.wunaiieq;public class UserVo {private String name;private int age;private String address;public UserVo(String name, int age, String address) {this.name = name;this.age = age;this.address = address;}@Overridepublic String toString() {return "UserVo{" +"name='" + name + '\'' +", age=" + age +", address='" + address + '\'' +'}';}public String getName() {return name;}public void setName(String name) {this.name = name;}public int getAge() {return age;}public void setAge(int age) {this.age = age;}public String getAddress() {return address;}public void setAddress(String address) {this.address = address;}
}
UserSerializer.java
重写Serializer接口实现序列化操作
package com.wunaiieq;import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Serializer;
import org.codehaus.jackson.map.ObjectMapper;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Map;public class UserSerializer implements Serializer<UserVo> {private ObjectMapper objectMapper;@Overridepublic void configure(Map<String, ?> configs, boolean isKey) {objectMapper = new ObjectMapper();//Serializer.super.configure(configs, isKey);}/*** @param topic 消息将要发送到的主题名* @param data 需要序列化的UserVo对象。* */@Overridepublic byte[] serialize(String topic, UserVo data) {//存储序列化后的字节数组byte[] ret = null;try {//data写成JSON字符串再写成UTF_8的字节数组ret = objectMapper.writeValueAsString(data).getBytes(StandardCharsets.UTF_8);} catch (IOException e) {throw new SerializationException("Error when serializing UserVo to byte[],exception is " + e.getMessage());}return ret;}@Overridepublic void close() {objectMapper = null;//Serializer.super.close();}
}
UserSerProducer.java
调用自定义序列化机制
package com.wunaiieq;import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;
import java.util.concurrent.ExecutionException;public class UserSerProducer {public static void main(String[] args) throws ExecutionException, InterruptedException {Properties prop = new Properties();prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.16.100:9092,192.168.16.101:9092,192.168.16.102:9092");// TODO 不用修改key的序列化机制,后续没用到prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// TODO 修改value的序列化机制prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, UserSerializer.class.getName());KafkaProducer<String,UserVo> producer = new KafkaProducer<String, UserVo>(prop);UserVo userVo = new UserVo("wunaiieq",18,"北京");producer.send(// TODO 关于消息记录的构造中,可以指定 1.主题、值 2.主题、键、值new ProducerRecord<String,UserVo>("topicA", userVo),new Callback() {//如下方法在生产者收到acks确认时异步调用@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if(e == null){//无异常信息,输出主题和分区信息到控制台System.out.println("topic:"+recordMetadata.topic()+",partition:"+recordMetadata.partition());}else{//打印异常信息System.out.println(e.getMessage());}}});Thread.sleep(50);producer.close();}
}
效果
前面的不用管,只是没清空而已
分区
描述
分区位于拦截器链后面
生产者分区的优势:
-
便于合理使用存储资源,每个Partition在一个Broker上存储,可以把海量的数据按照分区切割成一块一块数据存储在多台Broker上。合理控制分区的任务,可以实现负载均衡的效果。
-
提高并行度,生产者可以以分区为单位发送数据;消费者可以以分区为单位进行消费数据。
-
分区后,更方便于做副本备份,提高了数据安全性。
分区策略
以下为提供的默认分区策略,自行选择即可
- 轮询策略(Round-Robin Strategy)
原理:按照顺序将消息发送到不同的分区,每个消息被发送到其对应分区,循环轮询每个分区,确保消息在所有分区之间均匀分布。
特点:适用于生产者不需要根据消息内容或键选择特定分区的场景,能够实现负载均衡,最大限度地利用集群资源。
默认情况:这是Kafka Java生产者API默认提供的分区策略。如果没有指定分区策略,则会默认使用轮询策略。 - 按键分配策略(Key-Based Partitioning)
原理:消息的键被用作决定消息分区的依据。生产者将消息的键发送给Kafka,Kafka根据键的哈希值将消息路由到相应的分区。
特点:适用于键值对的数据结构,通过将具有相同键的消息发送到同一分区,可以提高数据局部性和处理效率。同时,能够保证具有相同键的消息顺序性。 - 范围分区策略(Range Partitioning)
原理:根据消息键的范围将消息分配到不同的分区。每个分区包含一个键值范围内的消息。
特点:适用于有序数据的处理,如时间戳或递增的ID。通过将具有相似时间戳或递增ID的消息分配到同一分区,可以提高处理效率并保证数据的顺序性。 - 自定义分区策略(Custom Partitioning)
原理:用户可以根据特定的业务逻辑或规则来决定消息的分区。通过实现自定义的分区器类,根据应用程序的需求来定义分区的逻辑。
特点:提供了更高的灵活性,可以根据地理位置、用户ID或其他业务规则来决定消息的分区。
实现方式:实现org.apache.kafka.clients.producer.Partitioner接口,并重写partition、close和configure方法。其中,partition方法是核心,用于根据给定的键、值和分区信息来计算分区号。 - 粘性分区策略(Sticky Partitioning)
原理:尽可能将消息分配到与之前消息相同的分区,以减少跨分区的数据移动和复制。通过维护一个分区和消费者的映射关系来实现。
特点:在消费者组或分区数量发生变化时,能够尽可能减少对现有分区分配的影响,减少负载均衡的开销,提高处理效率。
代码示例
写入默认分区(0号分区)
消息只会发送到指定的分区内部
package com.wunaiieq.partition;import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;
import java.util.concurrent.ExecutionException;public class ProducerToPartition {public static void main(String[] args) throws ExecutionException, InterruptedException {//TODO 1.声明并实例化Kafka Producer的配置文件对象Properties prop = new Properties();//TODO 2.为配置文件对象设置参数// 2.1 配置bootstrap_serversprop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.16.100:9092,192.168.16.101:9092,192.168.16.102:9092");// 2.2 配置key和value的序列化类prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());//TODO 3.声明并实例化生产者对象KafkaProducer<String,String> producer = new KafkaProducer<String, String>(prop);//TODO 4.发送消息for(int i = 0;i<5;i++){//指定数据发送到0号分区,key为nullproducer.send(new ProducerRecord<>("topicA",0,null, "unsync_msg" + i),new Callback() {//如下方法在生产者收到acks确认时异步调用@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if(e == null){//无异常信息,输出主题和分区信息到控制台System.out.println("topic:"+recordMetadata.topic()+",partition:"+recordMetadata.partition());}else{//打印异常信息System.out.println(e.getMessage());}}});Thread.sleep(5);}//TODO 5.关闭生产者producer.close();}
}
自定义分区机制
部分消息可能需要额外的处理内容,比如审计等等,这类消息的key会携带关键字符串“wunaiieq”,现在让其发送到topicA主题的最后一个分区上,以便于后续处理,其他的消息则随机发送(不包括最后一个分区)
WunaiieqPartitioner.java
分区写入策略
package com.wunaiieq.partition;import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;import java.util.List;
import java.util.Map;
import java.util.Random;public class WunaiieqPartitioner implements Partitioner {private Random random;@Overridepublic void configure(Map<String, ?> configs) {//该方法实现必要资源的初始化工作random = new Random();}/** 计算信息对应的分区* @param topic 主题* @param key 消息的key* @param keyBytes 消息的key序列化后的字节数组* @param value 消息的value* @param valueBytes 消息value序列化后的字节数组* @param cluster 集群元数据 可以获取分区信息* @return 息对应的分区号*/@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {//将key转换为字符串String keyInfo = (String)key;//获取主题的分区对象列表List<PartitionInfo> partitionInfoList = cluster.availablePartitionsForTopic(topic);//获取主题下的分区总数量int partCount = partitionInfoList.size();if (partCount <= 1) {System.out.println("1 partition");return 0; // 只有一个分区时,直接返回0}//最后一个分区号int wunaiieqPartition = partCount-1;//如果 key 为空、key 为空字符串或 key 不包含 "wunaiieq",则随机选择一个除最后一个分区外的分区;否则,消息发送到最后一个分区。return keyInfo==null || keyInfo.isEmpty()||!keyInfo.contains("wunaiieq")? random.nextInt(partCount-1) : wunaiieqPartition ;}@Overridepublic void close() {//该方法实现必要资源的清理工作random = null;}
}
CustomPartitionerProducer.java
调用分区策略
package com.wunaiieq.partition;import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;public class CustomPartitionerProducer {public static void main(String[] args) throws ExecutionException, InterruptedException {Properties prop = new Properties();prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.16.100:9092,192.168.16.101:9092,192.168.16.102:9092");prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// TODO 在用于构造KafkaProducer的Properties对象中设置partitioner.class参数prop.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.wunaiieq.partition.WunaiieqPartitioner");KafkaProducer<String,String> producer = new KafkaProducer<String, String>(prop);for(int i = 0;i<5;i++){// TODO 不指定分区号,key为"wunaiieq"测试运行一次,改为"kafka"后再测试一次。producer.send(new ProducerRecord<>("topicA","aa", "unsync_msg" + i),new Callback() {//如下方法在生产者收到acks确认时异步调用@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if(e == null){//无异常信息,输出主题和分区信息到控制台System.out.println("topic:"+recordMetadata.topic()+",partition:"+recordMetadata.partition());}else{//打印异常信息System.out.println(e.getMessage());}}});Thread.sleep(5);}producer.close();}
}
消息丢失
判断消息丢失时,一般看应答机制
- acks=0:因为不需要等待leader数据持久化就完成应答,leader宕机后可能存在数据丢失(follower内部数据从leader中同步,leader没有---->>follower也没有)
- acks=1:此时leader持久化完成应答(但是follower可能没有完成数据同步,leader宕机,导致数据丢失)一般用于传输普通日志
- acks=all(或acks=-1):leader和follower都持久化后并回应,才认为消息发送成功。这种方式性能最低,但可靠性最高。传输重要数据。
特殊情况
在acks=-1或all的情况下,Leader接收到数据并持久化后,所有Follower开始同步Leader刚刚持久化的数据,但是有一个Follower因故障迟迟不能进行数据同步,该问题应该怎么解决?
Leader维护了一个动态的in-sync replica set(ISR),意为和Leader保持同步的Follower+Leader集合(leader:0,isr:0,1,2)。
如果Follower长时间未向Leader发送通信请求或同步数据,则该Follower将被踢出ISR。
该时间阈值由replica.lag.time.max.ms参数设定,默认30000ms。例如1超时,(leader:0, isr:0,2)。这样就不用等长期联系不上或者已经故障的节点。
消息绝对不丢失的条件
- ACK级别设置为-1
- 分区副本>=2
- ISR应答的最小副本数>=2 (最小副本数有min.insync.replicas设置,默认为1)
生产中配置响应级别代码块:
// 设置 acks
prop.put(ProducerConfig.ACKS_CONFIG, "all");
//重试次数 retries,默认是 int 最大值,2147483647
prop.put(ProducerConfig.RETRIES_CONFIG, 3);
数据去重
描述
数据重复的原因
- 当生产者发送消息到Kafka集群时,如果由于网络故障或Kafka Broker的临时问题导致消息发送失败,生产者通常会进行重试。如果重试时Kafka Broker已经成功处理了之前的消息但尚未发送确认(ACK),那么重试发送的消息就会导致数据重复。
- 网络延迟或不稳定可能导致消息发送失败,生产者会进行重试,从而增加消息重复的风险。
- 如果Kafka Broker在消息发送成功后崩溃,但在发送确认(ACK)之前崩溃,生产者可能会重试发送相同的消息,导致消息重复。
数据去重
-
至少一次(At Least Once):ACK级别设置为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量大于等于2(保证数据绝对不丢失,但不能保证数据不重复)
-
最多一次(At Most Once):ACK级别设置为0(保证数据绝对不重复,但不能保证数据不丢失)
-
精确一次(Exactly Once):对于一些非常重要的信息,比如和钱相关的数据,要求数据既不能重复也不丢失。
幂等性
幂等性就是指Producer不论向Broker发送多少次重复数据,Broker端都只会持久化一条,保证了不重复。
- 精确一次(Exactly Once) = 幂等性 + 至少一次( ack=-1 + 分区副本数>=2 + ISR最小副本数量>=2) 。
- 重复数据的判断标准:具有<PID, Partition, SeqNumber>相同主键的消息提交时,Broker只会持久化一条。其中PID是Kafka每次重启都会分配一个新的;Partition 表示分区号;Sequence Number是单调自增的。所以幂等性只能保证的是在单分区单会话内不重复。
使用幂等性的使用
开启参数 enable.idempotence 默认为 true,false 关闭。
事务
- 寻找事务协调器:KafkaProducer 使用 trans.id 寻找事务协调器 (Transaction Coordinator)。
- 协调器通过 broker0 返回事务协调器的地址,包括事务信息的主题和分区领导者 (transaction_state-分区-Leader)。
- 请求 partId(开启幂等性): KafkaProducer向事务协调器请求 partId,并开启幂等性。
- 事务协调器接收到请求后,将请求持久化,并返回 partId给 KafkaProducer。
- 发送消息: KafkaProducer 使用返回的 partId 发送消息到指定主题的指定分区(topicA-Partition0 或 topicA-Partition1)。
6.发送 commit 请求: KafkaProducer发送 commit 请求到事务协调器,以提交事务。
7.事务协调器接收到 commit 请求后,将其持久化。- 事务成功:事务协调器确认事务成功,并返回成功信息给 KafkaProducer。
代码示例(事务)
package com.wunaiieq;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class ProducerTransaction {public static void main(String[] args) throws ExecutionException, InterruptedException {Properties prop = new Properties();prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.16.100:9092,192.168.16.101:9092,192.168.16.102:9092");prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());//TODO 设置事务idprop.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"transaction_id_topicA_0");KafkaProducer<String,String> producer = new KafkaProducer<String, String>(prop);//TODO 初始化事务producer.initTransactions();//TODO 开启事务producer.beginTransaction();//TODO 添加异常处理,成功提交事务,失败回滚事务try {//发送消息for (int i = 0; i < 5; i++) {//同步发送消息producer.send(new ProducerRecord<>("topicA", "sync_msg" + i)).get();}//TODO 提交事务producer.commitTransaction();}catch (Exception e){//TODO 放弃事务producer.abortTransaction();}producer.close();}
}
相关文章:
kafka生产者专题(原理+拦截器+序列化+分区+数据可靠+数据去重+事务)
目录 生产者发送数据原理参数说明代码示例(同步发送数据)代码示例(异步) 异步和同步的区别同步发送定义与流程特点 异步发送定义与流程特点 异步回调描述代码示例 拦截器描述代码示例 消息序列化描述代码示例(自定义序…...
谷歌2025年AI战略与产品线布局
在2024年12月的战略会议上,谷歌高层向员工描绘了2025年的宏伟蓝图,特别是在人工智能(AI)领域。这一年被定位为AI发展的关键转折点,谷歌计划通过一系列新产品和创新来巩固其在全球科技领域的领导地位。本文将深入探讨谷歌的2025年AI战略、重点产品以及竞争策略。 一、整体…...
Kernel Stack栈溢出攻击及保护绕过
前言 本文介绍Linux内核的栈溢出攻击,和内核一些保护的绕过手法,通过一道内核题及其变体从浅入深一步步走进kernel世界。 QWB_2018_core 题目分析 start.sh qemu-system-x86_64 \-m 128M \-kernel ./bzImage \-initrd ./core.cpio \-append "…...
QT-窗口嵌入外部exe
窗口类: #pragma once #include <QApplication> #include <QWidget> #include <QVBoxLayout> #include <QProcess> #include <QTimer> #include <QDebug> #include <Windows.h> #include <QWindow> #include <…...
38-其他地方使用模式
38-其他地方使用模式 模式除了可以在 match 表达式中使用外,还可以使用在变量定义(等号左侧是个模式)和 for in 表达式(for 关键字和 in 关键字之间是个模式)中。 但是,并不是所有的模式都能使用在变量定…...
才气小波与第一性原理
才气小波与第一性原理 才气小波与第一性原理具身智能云藏山鹰类型物热力学第二定律的动力机械外骨骼诠释才气小波导引社会科学概论软凝聚态数学意气实体过程王阳明代数Wangyangmingian王阳明算符才气语料库命运社会科学概论意气实体过程业务分层框架示例 才气小波与第一性原理 …...
104周六复盘 (188)UI
1、早上继续看二手书的一个章节,程序开发流程、引擎、AI等内容, 内容很浅,基本上没啥用,算是复习。 最大感触就是N年前看同类书的里程碑、AI相关章节时,会感觉跟自己没啥关系, 而如今则密切相关…...
CDP集群安全指南-动态数据加密
[〇]关于本文 集群的动态数据加密主要指的是加密通过网络协议传输的数据,防止数据在传输的过程中被窃取。由于大数据涉及的主机及服务众多。你需要更具集群的实际环境来评估需要为哪些环节实施动态加密。 这里介绍一种通过Cloudera Manager 的Auto-TLS功能来为整个…...
C# 设计模式:装饰器模式与代理模式的区别
C# 设计模式:装饰器模式与代理模式的区别 在软件设计中,装饰器模式(Decorator Pattern)和代理模式(Proxy Pattern)都是结构型设计模式,它们的目的都是通过对对象进行包装,来增加或改…...
[深度学习] 大模型学习1-大语言模型基础知识
大语言模型(Large Language Model,LLM)是一类基于Transformer架构的深度学习模型,主要用于处理与自然语言相关的各种任务。简单来说,当用户输入文本时,模型会生成相应的回复或结果。它能够完成许多任务&…...
GitLab集成Runner详细版--及注意事项汇总【最佳实践】
一、背景 看到网上很多用户提出的runner问题其实实际都不是问题,不过是因为对runner的一些细节不清楚导致了误解。本文不系统性的介绍GitLab-Runner,因为这类文章写得好的特别多,本文只汇总一些常几的问题/注意事项。旨在让新手少弯路。 二、…...
STM32-笔记34-4G遥控灯
4G接线 一、项目需求 服务器通过4G模块远程遥控开关灯。 二、项目实现 复制项目文件夹38-wifi控制风扇项目 重命名为39-4G遥控点灯 打开项目文件 加载文件 main.c #include "sys.h" #include "delay.h" #include "led.h" #include "ua…...
PHP 使用集合 处理复杂数据 提升开发效率
文章精选推荐 1 JetBrains Ai assistant 编程工具让你的工作效率翻倍 2 Extra Icons:JetBrains IDE的图标增强神器 3 IDEA插件推荐-SequenceDiagram,自动生成时序图 4 BashSupport Pro 这个ides插件主要是用来干嘛的 ? 5 IDEA必装的插件&…...
AI代码开发实践-微信小程序开发
接上回,本人参加了一次小孩学校组织的护学岗,萌生了开发一个微信小程序的水印相机的想法,说干就干。 最近也是在学习用AI编程,索性之前也用一点,今天就尝试一下 工具选择,环境搭建 阿里-通义灵码 通义灵…...
【网络安全 | 漏洞挖掘】私有项目中的账户接管过程
未经许可,不得转载。 正文 该程序包含多个通配符目标。在逐一搜索后,我最终发现了一个具有 P4 严重级别的 IDOR 漏洞(不正确的直接对象引用),允许我删除其他用户在帖子中的评论。 其中一个目标是一个只有单个域名的网站,提供注册、登录和重置密码功能。我尝试寻找任何可…...
【算法不挂科】算法期末考试【选择题专项练习】<多单元汇总>
前言 大家好吖,欢迎来到 YY 滴算法不挂科系列 ,热烈欢迎! 本章主要内容面向接触过C的老铁 主要内容含: 一.选择题 【1】算法绪论 1.算法与程序的区别是( ) A.输出 B.输入 C.确定性 D.有穷性 D 2.算法复杂度分析的两种基本方法…...
分布式消息中间件有哪些知名的产品
首先是Apache Kafka,它是一个分布式流处理平台,专注于高性能、低延迟的实时数据管道和流应用。Kafka通过分区和复制机制实现了高可用性和容错性,支持消息的持久化存储和高效的数据传输。其强大的生态系统还提供了丰富的客户端库和集成工具&am…...
kubernets基础入门
首先通过Kubernets架构图来认识Kubernets各个功能组件之间是怎么相互关联配合工作的。 一、Kubernets架构图: 通过上图来介绍集群功能组件通信详细过程: kubernetes api server 作为集群的核心,负责集群各功能模块之间的通信。集群内的各个功…...
【数据库初阶】MySQL数据类型
🎉博主首页: 有趣的中国人 🎉专栏首页: 数据库初阶 🎉其它专栏: C初阶 | C进阶 | 初阶数据结构 亲爱的小伙伴们,大家好!在这篇文章中,我们将深入浅出地为大家讲解 MySQL…...
(九千七-星河襟)椭圆曲线加密(ECC, Elliptic Curve Cryptography)及其例题
椭圆曲线加密(ECC)是一种基于椭圆曲线数学的公钥加密技术。它提供了一种高效的加密方法,能够在较小的密钥长度下实现与传统加密算法(如RSA)相同的安全级别。以下是ECC的主要特点和工作原理的总结: 1. 基本…...
LookingGlass使用
背景 Looking Glass 是一款开源应用程序,可以直接使用显卡直通的windows虚拟机。 常见环境是Linux hostwindows guest,基本部署结构图: 编译 git clone --recursive https://github.com/gnif/LookingGlass.git编译client mkdir client/b…...
ArcGIS Server 10.2授权文件过期处理
新的一年,arcgis server授权过期了,服务发不不了。查看ecp授权文件,原来的授权日期就到2024.12.31日。好吧,这里直接给出处理方法。 ArcGIS 10.2安装时,有的破解文件中会有含一个这样的注册程序,没有的话&…...
js es6 reduce函数, 通过规格生成sku
const specs [{ name: 颜色, values: [红色, 蓝色, 绿色] },{ name: 尺寸, values: [S, M, L] } ];function generateSKUs(specs) {return specs.reduce((acc, spec) > {const newAcc [];for (const combination of acc) {for (const value of spec.values) {newAcc.push(…...
Spring 事务底层原理
61 张图,剖析 Spring 事务,就是要钻到底! 拜托!面试请不要再问我 Transactional my: AOP Transactional PlatformTransactionManager:数据源隔离 TransactionInterceptor:拦截添加了注解的…...
ruoyi 分页 查询超出后还有数据; Mybatis-Plus 分页 超出后还有数据
修改:MybatisPlusConfig 类中 分页合理化修改为:paginationInnerInterceptor.setOverflow(false);...
基于Java的超级玛丽游戏的设计与实现【源码+文档+部署讲解】
目 录 1、绪论 1.1背景以及现状 1.2 Java语言的特点 1.3 系统运行环境及开发软件: 1.4 可行性的分析 1.4.1 技术可行性 1.4.2 经济可行性 1.4.3 操作可行性 2、 需求分析 2.1 用户需求分析 2.2功能需求分析 2.3界面设计需求分析…...
智慧工地信息管理与智能预警平台
建设背景与政策导向 智慧工地信息管理与智能预警平台的出现,源于工地管理面临的诸多挑战,如施工地点分散、危险区域多、监控手段落后等。随着政府对建筑产业现代化的积极推动,各地纷纷出台政策支持智慧工地的发展,旨在通过信息技…...
使用Apache Mahout制作 推荐引擎
目录 创建工程 基本概念 关键概念 基于用户与基于项目的分析 计算相似度的方法 协同过滤 基于内容的过滤 混合方法 创建一个推荐引擎 图书评分数据集 加载数据 从文件加载数据 从数据库加载数据 内存数据库 协同过滤 基于用户的过滤 基于项目的过滤 添加自定…...
记录:导出功能:接收文件流数据进行导出(vue3)
请求接口:一定要加responseType: blob 后端返回数据: api.js export function export() {return request({url: dev/api/export,method: get,responseType: blob,//一定要加}) } vue: import {export} from /api// 导出 const exportTab…...
GraphRAG vs 传统 RAG:如何通过知识图谱提升 AI 检索能力
相比传统 RAG 仅能独立检索文本片段的局限性,GraphRAG通过构建实体关系图谱实现了信息间的连接,让 AI 能更完整地理解和检索复杂的关联信息,从而生成更准确和连贯的回答 问题背景: 想象有一本详细记录某人(X)成就的传记,每个章节都描述了他的…...
问题清除指南|关于num_classes与 BCELoss、BCEWithLogitsLoss 和 CrossEntropyLoss 的关系
前言:关于「 num_classes 1 」引发的探究。 2024年尾声,学弟问到一个问题:在研究工作 CNNDetection 的github开源代码 networks/trainer.py 文件的 line 27 self.model resnet50(num_classes1) 中,变量 num_classes 的值为1&…...
组网实训实现
小型单元网络实现 IP划分: 外网:172.1.1.0/24 172.1.2.0/24 内网:基于192.168.3.0/24的子网划分 综合办公楼:192.168.3.00 000000 /26(192.168.3.0-192.168.3.63) 综合一楼:192.168.3.0000 0000 /28&…...
【DevOps】Jenkins部署
Jenkins部署 文章目录 Jenkins部署资源列表基础环境一、部署Gilab1.1、安装Gitlab1.2、修改配置文件1.3、加载配置文件1.4、访问Gitlab1.5、修改root登录密码1.6、创建demo测试项目1.7、上传代码1.8、验证上传的代码 二、部署Jenkins所需软件2.1、部署JDK2.2、部署Tomcat2.3、部…...
HTML——38.Span标签和字符实体
<!DOCTYPE html> <html><head><meta charset"UTF-8"><title>span标签和字符实体</title><style type"text/css">h1{text-align: center;}p{text-indent: 2em;}span{color: red;}</style></head><…...
doris:基于 Arrow Flight SQL 的高速数据传输链路
Doris 基于 Arrow Flight SQL 协议实现了高速数据链路,支持多种语言使用 SQL 从 Doris 高速读取大批量数据。 用途 从 Doris 加载大批量数据到其他组件,如 Python/Java/Spark/Flink,可以使用基于 Arrow Flight SQL 的 ADBC/JDBC 替代过去…...
文献阅读 | B. S. Carmo 2010
目录 一、文献名称二、原文地址三、ABSTRACT主要发现详细观察分岔分析雷诺数依赖性比较见解意义结论 四、IINTRODUCTION历史研究回顾计算研究近期研究进展研究空白与目的论文结构 一、文献名称 二、原文地址 研究目的:研究串列排列双圆柱体周围流场中的次级不稳定性…...
GRAPE——RLAIF微调VLA模型:通过偏好对齐提升机器人策略的泛化能力(含24年具身模型汇总)
前言 24年具身前沿模型大汇总 过去的这两年,工作之余,我狂写大模型与具身的文章,加之具身大火,每周都有各种朋友通过CSDN私我及我司「七月在线」寻求帮助/指导(当然,也欢迎各大开发团队与我司合作共同交付)…...
超越YOLO11!DEIM:先进的实时DETR目标检测
DEIM: DETR with Improved Matching for Fast Convergence arXiv: https://arxiv.org/abs/2412.04234 Project webpage:https://www.shihuahuang.cn/DEIM/ GitHub:https://github.com/ShihuaHuang95/DEIM 1 背景:DETR目标检测框架 目标检…...
django vue3实现大文件分段续传(断点续传)
前端环境准备及目录结构: npm create vue 并取名为big-file-upload-fontend 通过 npm i 安装以下内容"dependencies": {"axios": "^1.7.9","element-plus": "^2.9.1","js-sha256": "^0.11.0&quo…...
用户注册模块(芒果头条项目进度4)
1 创建⽤户模块⼦应⽤ 1.1 在项⽬包⽬录下 创建apps的python包。 1.2 在apps包下 创建应⽤userapp $ cd 项⽬包⽬录/apps $ python ../../manage.py startapp userapp 1.3 配置导包路径 默认情况下导包路径指向项⽬根⽬录 # 通过下⾯语句可以打印当前导包路径 print(sys.pa…...
Java Map集合、集合的嵌套
一. 概述 1. Map集合称为双列集合,格式:{key1value1, key2value2,.....},一次需要存一对数据作为一个元素。 2. Map集合的每个元素"keyvalue"称为一个键值对/键值对对象/一个Entry对象,Map集合也被称为"键值对集合"。 3.…...
C#中使用系统默认应用程序打开文件
有时您可能希望程序使用默认应用程序打开文件。 例如,您可能希望显示 PDF 文件、网页或互联网上的 URL。 System.Diagnostics.Process类的Start方法启动系统与文件关联的应用程序。 例如,如果文件扩展名为.txt,则系统会在 NotePad、WordPa…...
论文泛读《LPFHE: Low-Complexity Polynomial CNNs for Secure Inference over FHE》
文章目录 1、摘要2、介绍3、文章结构4、总结 1、摘要 Machine learning as a service (MLaaS) 在客户中越来越受欢迎。为了解决 MLaaS 中的隐私问题,引入了 FHE 来保护客户端的数据。 然而,FHE 不能直接评估 卷积神经网络 (CNNs) 中的非算数激活函数。…...
基于Spring Boot的IT技术交流和分享平台的设计与实现源码
风定落花生,歌声逐流水,大家好我是风歌,混迹在java圈的辛苦码农。今天要和大家聊的是一款基于springboot的IT技术交流和分享平台的设计与实现。项目源码以及部署相关请联系风歌,文末附上联系信息 。 项目简介: 基于S…...
力扣hot100——二分查找
35. 搜索插入位置 class Solution { public:int searchInsert(vector<int>& a, int x) {if (a[0] > x) return 0;int l 0, r a.size() - 1;while (l < r) {int mid (l r 1) / 2;if (a[mid] < x) l mid;else r mid - 1;}if (a[l] x) return l;else …...
1月第一讲:WxPython跨平台开发框架之前后端结合实现附件信息的上传及管理
1、功能描述和界面 前端(wxPython GUI): 提供文件选择、显示文件列表的界面。支持上传、删除和下载附件。展示上传状态和附件信息(如文件名、大小、上传时间)。后端(REST API 服务):…...
uniapp:跳转第三方地图
1.跳转第三方高德地图 //跳转地图 toMap(item){uni.navigateTo({url: (window.location.href https://uri.amap.com/navigation?to${item.lng},${item.lat},${item.shopName}&modecar&policy1&srchttps://gawl.gazhcs.com/wap/index.html&callnative0)}) },…...
源码理解 UE4中的 FCookStatsManager::FAutoRegisterCallback RegisterCookStats
官方文档:https://dev.epicgames.com/documentation/zh-cn/unreal-engine/API/Runtime/Core/ProfilingDebugging/FCookStatsManager文档中的注释: When a cook a complete that is configured to use stats (ENABLE_COOK_STATS), it will broadcast this…...
QML Text详解
1. 概述 Text 是 QML 中用来显示文本的基本组件。它可以用于创建静态的标签、标题、说明文字等,支持多种文本格式、样式、颜色、对齐方式等配置。Text 组件也支持动态文本内容的显示,可以通过绑定数据源来实时更新文本内容。 Text 组件非常灵活&#x…...
详细讲一下Prettier对我们日常开发的作用,以及详细用法
1.什么是 Prettier? // Prettier 是代码格式化工具,它可以自动调整代码格式 // 比如把这样的代码: function foo ( a, b ){ return ab; }// 自动格式化成这样: function foo(a, b) {return a b; } 2.基础配置详解 {// 控制…...