物联网之对接MQTT最佳实践
小伙伴们,你们好呀,我是老寇,跟我一起学习对接MQTT
安装EMQX
采用docker-compose一键式,启动!!!
还没有安装docker朋友,参考文章下面两篇文章
# Ubuntu20.04安装Docker
# Centos7安装Docker 23.0.6
使用 emqx 5.4.1,按照老夫的教程来,请不要改版本号!!!
使用 emqx 5.4.1,按照老夫的教程来,请不要改版本号!!!
使用 emqx 5.4.1,按照老夫的教程来,请不要改版本号!!!
services:emqx:image: emqx/emqx:5.4.1container_name: emqx# 保持容器在没有守护程序的情况下运行tty: truerestart: alwaysprivileged: trueports:- "1883:1883"- "8083:8083"- "8883:8883"- "18083:18083"environment:- TZ=Asia/Shanghaivolumes:# 挂载数据存储- ./emqx/data:/opt/emqx/data# 挂载日志文件- ./emqx/log:/opt/emqx/lognetworks:- laokou_network
networks:laokou_network:driver: bridge
访问 http://127.0.0.1:18083 设置密码
EMQX MQTT【摘抄自官方文档】
EMQX官方文档
MQTT 是物联网 (IoT) 的 OASIS 标准消息传递协议。它被设计为一种极轻量的发布/订阅消息传输协议,非常适合以较小的代码占用空间和极低的网络带宽连接远程设备。MQTT 目前广泛应用于汽车、制造、电信、石油和天然气等众多行业。
EMQX 完全兼容 MQTT 5.0 和 3.x,本节将介绍 MQTT 相关功能的基本配置项,包括基本 MQTT 设置、订阅设置、会话设置、强制关闭设置和强制垃圾回收设置等
客户端对接
本文章采用三种客户端对接
维度 | Paho | Hivemq-MQTT-Client | Vert.x MQTT Client |
---|---|---|---|
协议支持 | MQTT 3.1.1(5.0 实验性) | MQTT 5.0 完整支持 | MQTT 5.0(较新版本) |
性能 | 中(同步模式) | 高(异步非阻塞) | 极高(响应式架构) |
依赖复杂度 | 低 | 中(仅 Netty) | 高(需 Vert.x 生态) |
社区资源 | 丰富 | 较少 | 中等 |
适用场景 | 传统 IoT、跨语言项目 | 企业级 MQTT 5.0、高吞吐 | 响应式系统、高并发微服务 |
Paho【不推荐,连接不稳定】
Paho代码地址
引入依赖
<dependencies><dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.mqttv5.client</artifactId><version>1.2.5</version></dependency><dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>1.2.5</version></dependency>
</dependencies>
项目集成
PahoProperties
/*** @author laokou*/
@Data
public class PahoProperties {private boolean auth = true;private String username = "emqx";private String password = "laokou123";private String host = "127.0.0.1";private int port = 1883;private String clientId;private int subscribeQos = 1;private int publishQos = 0;private int willQos = 1;private int connectionTimeout = 60;private boolean manualAcks = false;// @formatter:off/*** 控制是否创建新会话(true=新建,false=复用历史会话). clearStart=true => Broker 会在连接断开后立即清除所有会话信息.* clearStart=false => Broker 会在连接断开后保存会话信息,并在重新连接后复用会话信息.* <a href="https://github.com/hivemq/hivemq-mqtt-client/issues/627">...</a>*/// @formatter:onprivate boolean clearStart = false;private int receiveMaximum = 10000;private int maximumPacketSize = 10000;// @formatter:off/*** 默认会话保留一天.* 最大值,4294967295L,会话过期时间【永不过期,单位秒】.* 定义客户端断开后会话保留的时间(仅在 Clean Session = false 时生效).*/private long sessionExpiryInterval = 86400L;// @formatter:on/*** 心跳包每隔60秒发一次.*/private int keepAliveInterval = 60;private boolean automaticReconnect = true;private Set<String> topics = new HashSet<>(0);}
PahoMqttClientMessageCallbackV5
/*** @author laokou*/
@Slf4j
@RequiredArgsConstructor
public class PahoMqttClientMessageCallbackV5 implements MqttCallback {private final List<MessageHandler> messageHandlers;@Overridepublic void disconnected(MqttDisconnectResponse disconnectResponse) {log.error("【Paho-V5】 => MQTT关闭连接");}@Overridepublic void mqttErrorOccurred(MqttException ex) {log.error("【Paho-V5】 => MQTT报错,错误信息:{}", ex.getMessage());}@Overridepublic void messageArrived(String topic, MqttMessage message) {for (MessageHandler messageHandler : messageHandlers) {if (messageHandler.isSubscribe(topic)) {log.info("【Paho-V5】 => MQTT接收到消息,Topic:{}", topic);messageHandler.handle(new org.laokou.sample.mqtt.handler.MqttMessage(message.getPayload(), topic));}}}@Overridepublic void deliveryComplete(IMqttToken token) {log.info("【Paho-V5】 => MQTT消息发送成功,消息ID:{}", token.getMessageId());}@Overridepublic void connectComplete(boolean reconnect, String uri) {if (reconnect) {log.info("【Paho-V5】 => MQTT重连成功,URI:{}", uri);}else {log.info("【Paho-V5】 => MQTT建立连接,URI:{}", uri);}}@Overridepublic void authPacketArrived(int reasonCode, MqttProperties properties) {log.info("【Paho-V5】 => 接收到身份验证数据包:{}", reasonCode);}}
PahoV5MqttClientTest
/*** @author laokou*/
@SpringBootTest
@RequiredArgsConstructor
@TestConstructor(autowireMode = TestConstructor.AutowireMode.ALL)
class PahoV5MqttClientTest {private final List<MessageHandler> messageHandlers;@Testvoid testMqttClient() throws InterruptedException {ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(16);PahoProperties pahoProperties = new PahoProperties();pahoProperties.setClientId("test-client-3");pahoProperties.setTopics(Set.of("/test-topic-3/#"));PahoMqttClientV5 pahoMqttClientV5 = new PahoMqttClientV5(pahoProperties, messageHandlers, scheduledExecutorService);pahoMqttClientV5.open();Thread.sleep(1000);pahoMqttClientV5.publish("/test-topic-3/789", "Hello World789".getBytes());}}
PahoMqttClientMessageCallbackV3
/*** @author laokou*/
@Slf4j
@RequiredArgsConstructor
public class PahoMqttClientMessageCallbackV3 implements MqttCallback {private final List<MessageHandler> messageHandlers;@Overridepublic void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {log.info("【Paho-V3】 => MQTT消息发送成功,消息ID:{}", iMqttDeliveryToken.getMessageId());}@Overridepublic void connectionLost(Throwable throwable) {log.error("【Paho-V3】 => MQTT关闭连接");}@Overridepublic void messageArrived(String topic, MqttMessage message) throws Exception {for (MessageHandler messageHandler : messageHandlers) {if (messageHandler.isSubscribe(topic)) {log.info("【Paho-V3】 => MQTT接收到消息,Topic:{}", topic);messageHandler.handle(new org.laokou.sample.mqtt.handler.MqttMessage(message.getPayload(), topic));}}}
}
PahoV3MqttClientTest
/*** @author laokou*/
@SpringBootTest
@RequiredArgsConstructor
@TestConstructor(autowireMode = TestConstructor.AutowireMode.ALL)
class PahoV3MqttClientTest {private final List<MessageHandler> messageHandlers;@Testvoid testMqttClient() throws InterruptedException {ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(16);PahoProperties pahoProperties2 = new PahoProperties();pahoProperties2.setClientId("test-client-4");pahoProperties2.setTopics(Set.of("/test-topic-4/#"));PahoMqttClientV3 pahoMqttClientV3 = new PahoMqttClientV3(pahoProperties2, messageHandlers, scheduledExecutorService);pahoMqttClientV3.open();Thread.sleep(1000);pahoMqttClientV3.publish("/test-topic-4/000", "Hello World000".getBytes());}}
Hivemq-MQTT-Client【不推荐】
注意:订阅一段时间收不到数据,标准mqtt5.0协议,不兼容emqx broker mqtt5.0
Hivemq代码地址
引入依赖
<dependencies><dependency><groupId>com.hivemq</groupId><artifactId>hivemq-mqtt-client-reactor</artifactId><version>1.3.5</version></dependency><dependency><groupId>com.hivemq</groupId><artifactId>hivemq-mqtt-client-epoll</artifactId><version>1.3.5</version><type>pom</type></dependency>
<dependencies>
项目集成
HivemqProperties
/*** @author laokou*/
@Data
public class HivemqProperties {private boolean auth = true;private String username = "emqx";private String password = "laokou123";private String host = "127.0.0.1";private int port = 1883;private String clientId;private int subscribeQos = 1;private int publishQos = 0;private int willQos = 1;// @formatter:off/*** 控制是否创建新会话(true=新建,false=复用历史会话). clearStart=true => Broker 会在连接断开后立即清除所有会话信息.* clearStart=false => Broker 会在连接断开后保存会话信息,并在重新连接后复用会话信息.* <a href="https://github.com/hivemq/hivemq-mqtt-client/issues/627">...</a>*/// @formatter:onprivate boolean clearStart = false;private int receiveMaximum = 10000;private int sendMaximum = 10000;private int maximumPacketSize = 10000;private int sendMaximumPacketSize = 10000;private int topicAliasMaximum = 1024;private int sendTopicAliasMaximum = 2048;private long messageExpiryInterval = 86400L;private boolean requestProblemInformation = true;private boolean requestResponseInformation = true;// @formatter:off/*** 默认会话保留一天.* 最大值,4294967295L,会话过期时间【永不过期,单位秒】.* 定义客户端断开后会话保留的时间(仅在 Clean Session = false 时生效).*/private long sessionExpiryInterval = 86400L;// @formatter:on/*** 心跳包每隔60秒发一次.*/private int keepAliveInterval = 60;private boolean automaticReconnect = true;private long automaticReconnectMaxDelay = 5;private long automaticReconnectInitialDelay = 1;private Set<String> topics = new HashSet<>(0);private int nettyThreads = 32;private boolean retain = false;private boolean noLocal = false;}
HivemqClientV5
/*** @author laokou*/
@Slf4j
public class HivemqClientV5 {/*** 响应主题.*/private final String RESPONSE_TOPIC = "response/topic";/*** 服务下线数据.*/private final byte[] WILL_PAYLOAD = "offline".getBytes(UTF_8);/*** 相关数据.*/private final byte[] CORRELATION_DATA = "correlationData".getBytes(UTF_8);private final HivemqProperties hivemqProperties;private final List<MessageHandler> messageHandlers;private volatile Mqtt5RxClient client;private final Object lock = new Object();private volatile Disposable connectDisposable;private volatile Disposable subscribeDisposable;private volatile Disposable unSubscribeDisposable;private volatile Disposable publishDisposable;private volatile Disposable disconnectDisposable;private volatile Disposable consumeDisposable;public HivemqClientV5(HivemqProperties hivemqProperties, List<MessageHandler> messageHandlers) {this.hivemqProperties = hivemqProperties;this.messageHandlers = messageHandlers;}public void open() {if (Objects.isNull(client)) {synchronized (lock) {if (Objects.isNull(client)) {client = getMqtt5ClientBuilder().buildRx();}}}connect();consume();}public void close() {if (!Objects.isNull(client)) {disconnectDisposable = client.disconnectWith().sessionExpiryInterval(hivemqProperties.getSessionExpiryInterval()).applyDisconnect().subscribeOn(Schedulers.io()).retryWhen(errors -> errors.scan(1, (retryCount, error) -> retryCount > 5 ? -1 : retryCount + 1).takeWhile(retryCount -> retryCount != -1).flatMap(retryCount -> Flowable.timer((long) Math.pow(2, retryCount) * 100, TimeUnit.MILLISECONDS))).subscribe(() -> log.info("【Hivemq-V5】 => MQTT断开连接成功,客户端ID:{}", hivemqProperties.getClientId()),e -> log.error("【Hivemq-V5】 => MQTT断开连接失败,错误信息:{}", e.getMessage(), e));}}public void subscribe() {String[] topics = getTopics();subscribe(topics, getQosArray(topics));}public String[] getTopics() {return hivemqProperties.getTopics().toArray(String[]::new);}public int[] getQosArray(String[] topics) {return Stream.of(topics).mapToInt(item -> hivemqProperties.getSubscribeQos()).toArray();}public void subscribe(String[] topics, int[] qosArray) {checkTopicAndQos(topics, qosArray);if (!Objects.isNull(client)) {List<Mqtt5Subscription> subscriptions = new ArrayList<>(topics.length);for (int i = 0; i < topics.length; i++) {subscriptions.add(Mqtt5Subscription.builder().topicFilter(topics[i]).qos(getMqttQos(qosArray[i])).retainAsPublished(hivemqProperties.isRetain()).noLocal(hivemqProperties.isNoLocal()).build());}subscribeDisposable = client.subscribeWith().addSubscriptions(subscriptions).applySubscribe().subscribeOn(Schedulers.io()).retryWhen(errors -> errors.scan(1, (retryCount, error) -> retryCount > 5 ? -1 : retryCount + 1).takeWhile(retryCount -> retryCount != -1).flatMap(retryCount -> Flowable.timer((long) Math.pow(2, retryCount) * 100, TimeUnit.MILLISECONDS))).subscribe(ack -> log.info("【Hivemq-V5】 => MQTT订阅成功,主题: {}", String.join("、", topics)), e -> log.error("【Hivemq-V5】 => MQTT订阅失败,主题:{},错误信息:{}", String.join("、", topics), e.getMessage(), e));}}public void unSubscribe() {String[] topics = hivemqProperties.getTopics().toArray(String[]::new);unSubscribe(topics);}public void unSubscribe(String[] topics) {checkTopic(topics);if (!Objects.isNull(client)) {List<MqttTopicFilter> matchedTopics = new ArrayList<>(topics.length);for (String topic : topics) {matchedTopics.add(MqttTopicFilter.of(topic));}unSubscribeDisposable = client.unsubscribeWith().addTopicFilters(matchedTopics).applyUnsubscribe().subscribeOn(Schedulers.io()).retryWhen(errors -> errors.scan(1, (retryCount, error) -> retryCount > 5 ? -1 : retryCount + 1).takeWhile(retryCount -> retryCount != -1).flatMap(retryCount -> Flowable.timer((long) Math.pow(2, retryCount) * 100, TimeUnit.MILLISECONDS))).subscribe(ack -> log.info("【Hivemq-V5】 => MQTT取消订阅成功,主题:{}", String.join("、", topics)), e -> log.error("【Hivemq-V5】 => MQTT取消订阅失败,主题:{},错误信息:{}", String.join("、", topics), e.getMessage(), e));}}public void publish(String topic, byte[] payload, int qos) {if (!Objects.isNull(client)) {publishDisposable = client.publish(Flowable.just(Mqtt5Publish.builder().topic(topic).qos(getMqttQos(qos)).payload(payload).noMessageExpiry().retain(hivemqProperties.isRetain()).messageExpiryInterval(hivemqProperties.getMessageExpiryInterval()).correlationData(CORRELATION_DATA).payloadFormatIndicator(Mqtt5PayloadFormatIndicator.UTF_8).contentType("text/plain").responseTopic(RESPONSE_TOPIC).build())).subscribeOn(Schedulers.io()).retryWhen(errors -> errors.scan(1, (retryCount, error) -> retryCount > 5 ? -1 : retryCount + 1).takeWhile(retryCount -> retryCount != -1).flatMap(retryCount -> Flowable.timer((long) Math.pow(2, retryCount) * 100, TimeUnit.MILLISECONDS))).subscribe(ack -> log.info("【Hivemq-V5】 => MQTT消息发布成功,topic:{}", topic),e -> log.error("【Hivemq-V5】 => MQTT消息发布失败,topic:{},错误信息:{}", topic, e.getMessage(), e));}}public void publish(String topic, byte[] payload) {publish(topic, payload, hivemqProperties.getPublishQos());}public void dispose(Disposable disposable) {if (!Objects.isNull(disposable) && !disposable.isDisposed()) {// 显式取消订阅disposable.dispose();}}public void dispose() {dispose(connectDisposable);dispose(subscribeDisposable);dispose(unSubscribeDisposable);dispose(publishDisposable);dispose(consumeDisposable);dispose(disconnectDisposable);}public void reSubscribe() {log.info("【Hivemq-V5】 => MQTT重新订阅开始");dispose(subscribeDisposable);subscribe();log.info("【Hivemq-V5】 => MQTT重新订阅结束");}private MqttQos getMqttQos(int qos) {return MqttQos.fromCode(qos);}private void connect() {connectDisposable = client.connectWith().keepAlive(hivemqProperties.getKeepAliveInterval()).cleanStart(hivemqProperties.isClearStart()).sessionExpiryInterval(hivemqProperties.getSessionExpiryInterval()).willPublish().topic("will/topic").payload(WILL_PAYLOAD).qos(getMqttQos(hivemqProperties.getWillQos())).retain(true).messageExpiryInterval(100).delayInterval(10).payloadFormatIndicator(Mqtt5PayloadFormatIndicator.UTF_8).contentType("text/plain").responseTopic(RESPONSE_TOPIC).correlationData(CORRELATION_DATA).applyWillPublish().restrictions().receiveMaximum(hivemqProperties.getReceiveMaximum()).sendMaximum(hivemqProperties.getSendMaximum()).maximumPacketSize(hivemqProperties.getMaximumPacketSize()).sendMaximumPacketSize(hivemqProperties.getSendMaximumPacketSize()).topicAliasMaximum(hivemqProperties.getTopicAliasMaximum()).sendTopicAliasMaximum(hivemqProperties.getSendTopicAliasMaximum()).requestProblemInformation(hivemqProperties.isRequestProblemInformation()).requestResponseInformation(hivemqProperties.isRequestResponseInformation()).applyRestrictions().applyConnect().toFlowable().firstElement().subscribeOn(Schedulers.io()).retryWhen(errors -> errors.scan(1, (retryCount, error) -> retryCount > 5 ? -1 : retryCount + 1).takeWhile(retryCount -> retryCount != -1).flatMap(retryCount -> Flowable.timer((long) Math.pow(2, retryCount) * 100, TimeUnit.MILLISECONDS))).subscribe(ack -> log.info("【Hivemq-V5】 => MQTT连接成功,主机:{},端口:{},客户端ID:{}", hivemqProperties.getHost(),hivemqProperties.getPort(), hivemqProperties.getClientId()),e -> log.error("【Hivemq-V5】 => MQTT连接失败,错误信息:{}", e.getMessage(), e));}private void consume() {if (!Objects.isNull(client)) {consumeDisposable = client.publishes(MqttGlobalPublishFilter.ALL).onBackpressureBuffer(8192).observeOn(Schedulers.computation(), false, 8192).doOnSubscribe(subscribe -> {log.info("【Hivemq-V5】 => MQTT开始订阅消息,请稍候。。。。。。");reSubscribe();}).subscribeOn(Schedulers.io()).retryWhen(errors -> errors.scan(1, (retryCount, error) -> retryCount > 5 ? -1 : retryCount + 1).takeWhile(retryCount -> retryCount != -1).flatMap(retryCount -> Flowable.timer((long) Math.pow(2, retryCount) * 100, TimeUnit.MILLISECONDS))).subscribe(publish -> {for (MessageHandler messageHandler : messageHandlers) {if (messageHandler.isSubscribe(publish.getTopic().toString())) {log.info("【Hivemq-V5】 => MQTT接收到消息,Topic:{}", publish.getTopic());messageHandler.handle(new MqttMessage(publish.getPayloadAsBytes(), publish.getTopic().toString()));}}}, e -> log.error("【Hivemq-V5】 => MQTT消息处理失败,错误信息:{}", e.getMessage(), e),() -> log.info("【Hivemq-V5】 => MQTT订阅消息结束,请稍候。。。。。。"));}}private Mqtt5ClientBuilder getMqtt5ClientBuilder() {Mqtt5ClientBuilder builder = Mqtt5Client.builder().addConnectedListener(listener -> {Optional<? extends MqttClientConnectionConfig> config = Optional.of(listener.getClientConfig().getConnectionConfig()).get();config.ifPresent(mqttClientConnectionConfig -> log.info("【Hivemq-V5】 => MQTT连接保持时间:{}ms",mqttClientConnectionConfig.getKeepAlive()));log.info("【Hivemq-V5】 => MQTT已连接,客户端ID:{}", hivemqProperties.getClientId());}).addDisconnectedListener(listener -> log.error("【Hivemq-V5】 => MQTT已断开连接,客户端ID:{}", hivemqProperties.getClientId())).identifier(hivemqProperties.getClientId()).serverHost(hivemqProperties.getHost()).serverPort(hivemqProperties.getPort()).executorConfig(MqttClientExecutorConfig.builder().nettyExecutor(ThreadUtils.newVirtualTaskExecutor()).nettyThreads(hivemqProperties.getNettyThreads()).applicationScheduler(Schedulers.from(ThreadUtils.newVirtualTaskExecutor())).build());// 开启重连if (hivemqProperties.isAutomaticReconnect()) {builder.automaticReconnect().initialDelay(hivemqProperties.getAutomaticReconnectInitialDelay(), TimeUnit.SECONDS).maxDelay(hivemqProperties.getAutomaticReconnectMaxDelay(), TimeUnit.SECONDS).applyAutomaticReconnect();}if (hivemqProperties.isAuth()) {builder.simpleAuth().username(hivemqProperties.getUsername()).password(hivemqProperties.getPassword().getBytes()).applySimpleAuth();}return builder;}private void checkTopicAndQos(String[] topics, int[] qosArray) {if (topics == null || qosArray == null) {throw new IllegalArgumentException("【" + "Hivemq-V5" + "】 => Topics and QoS arrays cannot be null");}if (topics.length != qosArray.length) {throw new IllegalArgumentException("【" + "Hivemq-V5" + "】 => Topics and QoS arrays must have the same length");}if (topics.length == 0) {throw new IllegalArgumentException("【" + "Hivemq-V5" + "】 => Topics array cannot be empty");}}private void checkTopic(String[] topics) {if (topics.length == 0) {throw new IllegalArgumentException("【" + "Hivemq-V5" + "】 => Topics array cannot be empty");}}}
HivemqV5MqttClientTest
/*** @author laokou*/
@SpringBootTest
@RequiredArgsConstructor
@TestConstructor(autowireMode = TestConstructor.AutowireMode.ALL)
class HivemqV5MqttClientTest {private final List<MessageHandler> messageHandlers;@Testvoid testMqttClient() throws InterruptedException {HivemqProperties hivemqProperties = new HivemqProperties();hivemqProperties.setClientId("test-client-1");hivemqProperties.setTopics(Set.of("/test-topic-1/#"));HivemqClientV5 hivemqClientV5 = new HivemqClientV5(hivemqProperties, messageHandlers);hivemqClientV5.open();hivemqClientV5.publish("/test-topic-1/123", "Hello World123".getBytes());}}
HivemqClientV3
/*** @author laokou*/
@Slf4j
public class HivemqClientV3 {/*** 服务下线数据.*/private final byte[] WILL_PAYLOAD = "offline".getBytes(UTF_8);private final HivemqProperties hivemqProperties;private final List<MessageHandler> messageHandlers;private volatile Mqtt3RxClient client;private final Object lock = new Object();private volatile Disposable connectDisposable;private volatile Disposable subscribeDisposable;private volatile Disposable unSubscribeDisposable;private volatile Disposable publishDisposable;private volatile Disposable disconnectDisposable;private volatile Disposable consumeDisposable;public HivemqClientV3(HivemqProperties hivemqProperties, List<MessageHandler> messageHandlers) {this.hivemqProperties = hivemqProperties;this.messageHandlers = messageHandlers;}public void open() {if (Objects.isNull(client)) {synchronized (lock) {if (Objects.isNull(client)) {client = getMqtt3ClientBuilder().buildRx();}}}connect();consume();}public void close() {if (!Objects.isNull(client)) {disconnectDisposable = client.disconnect().subscribeOn(Schedulers.io()).retryWhen(errors -> errors.scan(1, (retryCount, error) -> retryCount > 5 ? -1 : retryCount + 1).takeWhile(retryCount -> retryCount != -1).flatMap(retryCount -> Flowable.timer((long) Math.pow(2, retryCount) * 100, TimeUnit.MILLISECONDS))).subscribe(() -> log.info("【Hivemq-V3】 => MQTT断开连接成功,客户端ID:{}", hivemqProperties.getClientId()),e -> log.error("【Hivemq-V3】 => MQTT断开连接失败,错误信息:{}", e.getMessage(), e));}}public void subscribe() {String[] topics = getTopics();subscribe(topics, getQosArray(topics));}public String[] getTopics() {return hivemqProperties.getTopics().toArray(String[]::new);}public int[] getQosArray(String[] topics) {return Stream.of(topics).mapToInt(item -> hivemqProperties.getSubscribeQos()).toArray();}public void subscribe(String[] topics, int[] qosArray) {checkTopicAndQos(topics, qosArray);if (!Objects.isNull(client)) {List<Mqtt3Subscription> subscriptions = new ArrayList<>(topics.length);for (int i = 0; i < topics.length; i++) {subscriptions.add(Mqtt3Subscription.builder().topicFilter(topics[i]).qos(getMqttQos(qosArray[i])).build());}subscribeDisposable = client.subscribeWith().addSubscriptions(subscriptions).applySubscribe().subscribeOn(Schedulers.io()).retryWhen(errors -> errors.scan(1, (retryCount, error) -> retryCount > 5 ? -1 : retryCount + 1).takeWhile(retryCount -> retryCount != -1).flatMap(retryCount -> Flowable.timer((long) Math.pow(2, retryCount) * 100, TimeUnit.MILLISECONDS))).subscribe(ack -> log.info("【Hivemq-V3】 => MQTT订阅成功,主题: {}", String.join("、", topics)), e -> log.error("【Hivemq-V3】 => MQTT订阅失败,主题:{},错误信息:{}", String.join("、", topics), e.getMessage(), e));}}public void unSubscribe() {String[] topics = hivemqProperties.getTopics().toArray(String[]::new);unSubscribe(topics);}public void unSubscribe(String[] topics) {checkTopic(topics);if (!Objects.isNull(client)) {List<MqttTopicFilter> matchedTopics = new ArrayList<>(topics.length);for (String topic : topics) {matchedTopics.add(MqttTopicFilter.of(topic));}unSubscribeDisposable = client.unsubscribeWith().addTopicFilters(matchedTopics).applyUnsubscribe().subscribeOn(Schedulers.io()).retryWhen(errors -> errors.scan(1, (retryCount, error) -> retryCount > 5 ? -1 : retryCount + 1).takeWhile(retryCount -> retryCount != -1).flatMap(retryCount -> Flowable.timer((long) Math.pow(2, retryCount) * 100, TimeUnit.MILLISECONDS))).subscribe(() -> log.info("【Hivemq-V3】 => MQTT取消订阅成功,主题:{}", String.join("、", topics)), e -> log.error("【Hivemq-V3】 => MQTT取消订阅失败,主题:{},错误信息:{}", String.join("、", topics), e.getMessage(), e));}}public void publish(String topic, byte[] payload, int qos) {if (!Objects.isNull(client)) {publishDisposable = client.publish(Flowable.just(Mqtt3Publish.builder().topic(topic).qos(getMqttQos(qos)).payload(payload).retain(hivemqProperties.isRetain()).build())).subscribeOn(Schedulers.io()).retryWhen(errors -> errors.scan(1, (retryCount, error) -> retryCount > 5 ? -1 : retryCount + 1).takeWhile(retryCount -> retryCount != -1).flatMap(retryCount -> Flowable.timer((long) Math.pow(2, retryCount) * 100, TimeUnit.MILLISECONDS))).subscribe(ack -> log.info("【Hivemq-V3】 => MQTT消息发布成功,topic:{}", topic),e -> log.error("【Hivemq-V3】 => MQTT消息发布失败,topic:{},错误信息:{}", topic, e.getMessage(), e));}}public void publish(String topic, byte[] payload) {publish(topic, payload, hivemqProperties.getPublishQos());}public void dispose(Disposable disposable) {if (!Objects.isNull(disposable) && !disposable.isDisposed()) {// 显式取消订阅disposable.dispose();}}public void dispose() {dispose(connectDisposable);dispose(subscribeDisposable);dispose(unSubscribeDisposable);dispose(publishDisposable);dispose(consumeDisposable);dispose(disconnectDisposable);}public void reSubscribe() {log.info("【Hivemq-V3】 => MQTT重新订阅开始");dispose(subscribeDisposable);subscribe();log.info("【Hivemq-V3】 => MQTT重新订阅结束");}private MqttQos getMqttQos(int qos) {return MqttQos.fromCode(qos);}private void connect() {connectDisposable = client.connectWith().keepAlive(hivemqProperties.getKeepAliveInterval()).willPublish().topic("will/topic").payload(WILL_PAYLOAD).qos(getMqttQos(hivemqProperties.getWillQos())).retain(true).applyWillPublish().restrictions().sendMaximum(hivemqProperties.getSendMaximum()).sendMaximumPacketSize(hivemqProperties.getSendMaximumPacketSize()).applyRestrictions().applyConnect().toFlowable().firstElement().subscribeOn(Schedulers.io()).retryWhen(errors -> errors.scan(1, (retryCount, error) -> retryCount > 5 ? -1 : retryCount + 1).takeWhile(retryCount -> retryCount != -1).flatMap(retryCount -> Flowable.timer((long) Math.pow(2, retryCount) * 100, TimeUnit.MILLISECONDS))).subscribe(ack -> log.info("【Hivemq-V3】 => MQTT连接成功,主机:{},端口:{},客户端ID:{}", hivemqProperties.getHost(),hivemqProperties.getPort(), hivemqProperties.getClientId()),e -> log.error("【Hivemq-V3】 => MQTT连接失败,错误信息:{}", e.getMessage(), e));}private void consume() {if (!Objects.isNull(client)) {consumeDisposable = client.publishes(MqttGlobalPublishFilter.ALL).onBackpressureBuffer(8192).observeOn(Schedulers.computation(), false, 8192).doOnSubscribe(subscribe -> {log.info("【Hivemq-V3】 => MQTT开始订阅消息,请稍候。。。。。。");reSubscribe();}).subscribeOn(Schedulers.io()).retryWhen(errors -> errors.scan(1, (retryCount, error) -> retryCount > 5 ? -1 : retryCount + 1).takeWhile(retryCount -> retryCount != -1).flatMap(retryCount -> Flowable.timer((long) Math.pow(2, retryCount) * 100, TimeUnit.MILLISECONDS))).subscribe(publish -> {for (MessageHandler messageHandler : messageHandlers) {if (messageHandler.isSubscribe(publish.getTopic().toString())) {log.info("【Hivemq-V3】 => MQTT接收到消息,Topic:{}", publish.getTopic());messageHandler.handle(new MqttMessage(publish.getPayloadAsBytes(), publish.getTopic().toString()));}}}, e -> log.error("【Hivemq-V3】 => MQTT消息处理失败,错误信息:{}", e.getMessage(), e),() -> log.info("【Hivemq-V3】 => MQTT订阅消息结束,请稍候。。。。。。"));}}private Mqtt3ClientBuilder getMqtt3ClientBuilder() {Mqtt3ClientBuilder builder = Mqtt3Client.builder().addConnectedListener(listener -> {Optional<? extends MqttClientConnectionConfig> config = Optional.of(listener.getClientConfig().getConnectionConfig()).get();config.ifPresent(mqttClientConnectionConfig -> log.info("【Hivemq-V5】 => MQTT连接保持时间:{}ms",mqttClientConnectionConfig.getKeepAlive()));log.info("【Hivemq-V3】 => MQTT已连接,客户端ID:{}", hivemqProperties.getClientId());}).addDisconnectedListener(listener -> log.error("【Hivemq-V3】 => MQTT已断开连接,客户端ID:{}", hivemqProperties.getClientId())).identifier(hivemqProperties.getClientId()).serverHost(hivemqProperties.getHost()).serverPort(hivemqProperties.getPort()).executorConfig(MqttClientExecutorConfig.builder().nettyExecutor(ThreadUtils.newVirtualTaskExecutor()).nettyThreads(hivemqProperties.getNettyThreads()).applicationScheduler(Schedulers.from(ThreadUtils.newVirtualTaskExecutor())).build());// 开启重连if (hivemqProperties.isAutomaticReconnect()) {builder.automaticReconnect().initialDelay(hivemqProperties.getAutomaticReconnectInitialDelay(), TimeUnit.SECONDS).maxDelay(hivemqProperties.getAutomaticReconnectMaxDelay(), TimeUnit.SECONDS).applyAutomaticReconnect();}if (hivemqProperties.isAuth()) {builder.simpleAuth().username(hivemqProperties.getUsername()).password(hivemqProperties.getPassword().getBytes()).applySimpleAuth();}return builder;}private void checkTopicAndQos(String[] topics, int[] qosArray) {if (topics == null || qosArray == null) {throw new IllegalArgumentException("【" + "Hivemq-V3" + "】 => Topics and QoS arrays cannot be null");}if (topics.length != qosArray.length) {throw new IllegalArgumentException("【" + "Hivemq-V3" + "】 => Topics and QoS arrays must have the same length");}if (topics.length == 0) {throw new IllegalArgumentException("【" + "Hivemq-V3" + "】 => Topics array cannot be empty");}}private void checkTopic(String[] topics) {if (topics.length == 0) {throw new IllegalArgumentException("【" + "Hivemq-V3" + "】 => Topics array cannot be empty");}}}
HivemqV3MqttClientTest
/*** @author laokou*/
@SpringBootTest
@RequiredArgsConstructor
@TestConstructor(autowireMode = TestConstructor.AutowireMode.ALL)
class HivemqV3MqttClientTest {private final List<MessageHandler> messageHandlers;@Testvoid testMqttClient() throws InterruptedException {HivemqProperties hivemqProperties2 = new HivemqProperties();hivemqProperties2.setClientId("test-client-2");hivemqProperties2.setTopics(Set.of("/test-topic-2/#"));HivemqClientV3 hivemqClientV3 = new HivemqClientV3(hivemqProperties2, messageHandlers);hivemqClientV3.open();hivemqClientV3.publish("/test-topic-2/456", "Hello World456".getBytes());}}
Vert.x MQTT Client【推荐,只兼容mqtt3.1.1】
# Vert.x MQTT文档
引入依赖
<dependencies><dependency><groupId>io.vertx</groupId><artifactId>vertx-mqtt</artifactId><version>4.5.14</version></dependency><dependency><groupId>io.projectreactor</groupId><artifactId>reactor-core</artifactId><version>3.7.5</version></dependency>
</dependencies>
项目集成
MqttClientProperties
/*** @author laokou*/
@Data
public class MqttClientProperties {private boolean auth = true;private String username = "emqx";private String password = "laokou123";private String host = "127.0.0.1";private int port = 1883;private String clientId = UUIDGenerator.generateUUID();// @formatter:off/*** 控制是否创建新会话(true=新建,false=复用历史会话). clearStart=true => Broker 会在连接断开后立即清除所有会话信息.* clearStart=false => Broker 会在连接断开后保存会话信息,并在重新连接后复用会话信息.*/// @formatter:onprivate boolean clearSession = false;private int receiveBufferSize = Integer.MAX_VALUE;private int maxMessageSize = -1;/*** 心跳包每隔60秒发一次.*/private int keepAliveInterval = 60;private boolean autoKeepAlive = true;private long reconnectInterval = 1000;private int reconnectAttempts = Integer.MAX_VALUE;private Map<String, Integer> topics = new HashMap<>(0);private int willQos = 1;private boolean willRetain = false;private int ackTimeout = -1;private boolean autoAck = true;/*** 服务下线主题.*/private String willTopic = "/will";/*** 服务下线数据.*/private String willPayload = "offline";}
VertxConfig
/*** @author laokou*/
@Configuration
public class VertxConfig {@Beanpublic Vertx vertx() {VertxOptions vertxOptions = new VertxOptions();vertxOptions.setMaxEventLoopExecuteTime(60);vertxOptions.setMaxWorkerExecuteTime(60);vertxOptions.setMaxEventLoopExecuteTimeUnit(TimeUnit.SECONDS);vertxOptions.setMaxWorkerExecuteTimeUnit(TimeUnit.SECONDS);vertxOptions.setPreferNativeTransport(true);return Vertx.vertx(vertxOptions);}}
VertxMqttClient
注意:vertx-mqtt不支持客户端自动断线重连,网络不通畅或连接关闭,需要自己手动调用连接!!!实现这个重连的功能
/*** @author laokou*/
@Slf4j
public class VertxMqttClient {private final Sinks.Many<MqttPublishMessage> messageSink = Sinks.many().multicast().onBackpressureBuffer(Integer.MAX_VALUE, false);private final MqttClient mqttClient;private final Vertx vertx;private final MqttClientProperties mqttClientProperties;private final List<MessageHandler> messageHandlers;private final List<Disposable> disposables;private final AtomicBoolean isConnected = new AtomicBoolean(false);private final AtomicBoolean isLoaded = new AtomicBoolean(false);private final AtomicBoolean isReconnected = new AtomicBoolean(true);public VertxMqttClient(final Vertx vertx, final MqttClientProperties mqttClientProperties,final List<MessageHandler> messageHandlers) {this.vertx = vertx;this.mqttClientProperties = mqttClientProperties;this.mqttClient = MqttClient.create(vertx, getOptions());this.messageHandlers = messageHandlers;this.disposables = Collections.synchronizedList(new ArrayList<>());}public void open() {mqttClient.closeHandler(v -> {isConnected.set(false);log.error("【Vertx-MQTT】 => MQTT连接断开,客户端ID:{}", mqttClientProperties.getClientId());reconnect();}).publishHandler(messageSink::tryEmitNext)// 仅接收QoS1和QoS2的消息.publishCompletionHandler(id -> {// log.info("【Vertx-MQTT】 => 接收MQTT的PUBACK或PUBCOMP数据包,数据包ID:{}", id);}).subscribeCompletionHandler(ack -> {// log.info("【Vertx-MQTT】 => 接收MQTT的SUBACK数据包,数据包ID:{}", ack.messageId());}).unsubscribeCompletionHandler(id -> {// log.info("【Vertx-MQTT】 => 接收MQTT的UNSUBACK数据包,数据包ID:{}", id);}).pingResponseHandler(s -> {// log.info("【Vertx-MQTT】 => 接收MQTT的PINGRESP数据包");}).connect(mqttClientProperties.getPort(), mqttClientProperties.getHost(), connectResult -> {if (connectResult.succeeded()) {isConnected.set(true);log.info("【Vertx-MQTT】 => MQTT连接成功,主机:{},端口:{},客户端ID:{}", mqttClientProperties.getHost(),mqttClientProperties.getPort(), mqttClientProperties.getClientId());resubscribe();}else {isConnected.set(false);Throwable ex = connectResult.cause();log.error("【Vertx-MQTT】 => MQTT连接失败,原因:{},客户端ID:{}", ex.getMessage(),mqttClientProperties.getClientId(), ex);reconnect();}});}public void close() {disconnect();}/*** Sends the PUBLISH message to the remote MQTT server.* @param topic topic on which the message is published* @param payload message payload* @param qos QoS level* @param isDup if the message is a duplicate* @param isRetain if the message needs to be retained*/public void publish(String topic, int qos, String payload, boolean isDup, boolean isRetain) {mqttClient.publish(topic, Buffer.buffer(payload), convertQos(qos), isDup, isRetain);}private void reconnect() {if (isReconnected.get()) {log.info("【Vertx-MQTT】 => MQTT尝试重连");vertx.setTimer(mqttClientProperties.getReconnectInterval(),handler -> ThreadUtils.newVirtualTaskExecutor().execute(this::open));}}private void subscribe() {Map<String, Integer> topics = mqttClientProperties.getTopics();checkTopicAndQos(topics);mqttClient.subscribe(topics, subscribeResult -> {if (subscribeResult.succeeded()) {log.info("【Vertx-MQTT】 => MQTT订阅成功,主题: {}", String.join("、", topics.keySet()));}else {Throwable ex = subscribeResult.cause();log.error("【Vertx-MQTT】 => MQTT订阅失败,主题:{},错误信息:{}", String.join("、", topics.keySet()), ex.getMessage(),ex);}});}private void resubscribe() {if (isConnected.get() || mqttClient.isConnected()) {ThreadUtils.newVirtualTaskExecutor().execute(this::subscribe);}if (isLoaded.compareAndSet(false, true)) {ThreadUtils.newVirtualTaskExecutor().execute(this::consume);}}private void consume() {Disposable disposable = messageSink.asFlux().doOnNext(mqttPublishMessage -> {String topic = mqttPublishMessage.topicName();log.info("【Vertx-MQTT】 => MQTT接收到消息,Topic:{}", topic);for (MessageHandler messageHandler : messageHandlers) {if (messageHandler.isSubscribe(topic)) {messageHandler.handle(new MqttMessage(mqttPublishMessage.payload(), topic));}}}).subscribeOn(Schedulers.boundedElastic()).subscribe();disposables.add(disposable);}private void disposable() {for (Disposable disposable : disposables) {if (ObjectUtils.isNotNull(disposable) && !disposable.isDisposed()) {disposable.dispose();}}}private void disconnect() {isReconnected.set(false);mqttClient.disconnect(disconnectResult -> {if (disconnectResult.succeeded()) {disposable();log.info("【Vertx-MQTT】 => MQTT断开连接成功");disposables.clear();}else {Throwable ex = disconnectResult.cause();log.error("【Vertx-MQTT】 => MQTT断开连接失败,错误信息:{}", ex.getMessage(), ex);}});}private void unsubscribe(List<String> topics) {checkTopic(topics);mqttClient.unsubscribe(topics, unsubscribeResult -> {if (unsubscribeResult.succeeded()) {log.info("【Vertx-MQTT】 => MQTT取消订阅成功,主题:{}", String.join("、", topics));}else {Throwable ex = unsubscribeResult.cause();log.error("【Vertx-MQTT】 => MQTT取消订阅失败,主题:{},错误信息:{}", String.join("、", topics), ex.getMessage(), ex);}});}private MqttClientOptions getOptions() {MqttClientOptions options = new MqttClientOptions();options.setClientId(mqttClientProperties.getClientId());options.setCleanSession(mqttClientProperties.isClearSession());options.setAutoKeepAlive(mqttClientProperties.isAutoKeepAlive());options.setKeepAliveInterval(mqttClientProperties.getKeepAliveInterval());options.setReconnectAttempts(mqttClientProperties.getReconnectAttempts());options.setReconnectInterval(mqttClientProperties.getReconnectInterval());options.setWillQoS(mqttClientProperties.getWillQos());options.setWillTopic(mqttClientProperties.getWillTopic());options.setAutoAck(mqttClientProperties.isAutoAck());options.setAckTimeout(mqttClientProperties.getAckTimeout());options.setWillRetain(mqttClientProperties.isWillRetain());options.setWillMessageBytes(Buffer.buffer(mqttClientProperties.getWillPayload()));options.setReceiveBufferSize(mqttClientProperties.getReceiveBufferSize());options.setMaxMessageSize(mqttClientProperties.getMaxMessageSize());if (mqttClientProperties.isAuth()) {options.setPassword(mqttClientProperties.getPassword());options.setUsername(mqttClientProperties.getUsername());}return options;}private void checkTopicAndQos(Map<String, Integer> topics) {topics.forEach((topic, qos) -> {if (StringUtils.isEmpty(topic) || ObjectUtils.isNull(qos)) {throw new IllegalArgumentException("【Vertx-MQTT】 => Topic and QoS cannot be null");}});}private void checkTopic(List<String> topics) {if (CollectionUtils.isEmpty(topics)) {throw new IllegalArgumentException("【Vertx-MQTT】 => Topics list cannot be empty");}}private MqttQoS convertQos(int qos) {return MqttQoS.valueOf(qos);}}
VertxMqttClientTest
/*** @author laokou*/
@SpringBootTest
@RequiredArgsConstructor
@ContextConfiguration(classes = { DefaultMessageHandler.class, VertxConfig.class })
@TestConstructor(autowireMode = TestConstructor.AutowireMode.ALL)
class VertxMqttClientTest {private final List<MessageHandler> messageHandlers;private final Vertx vertx;@Testvoid testMqttClient() throws InterruptedException {MqttClientProperties properties = new MqttClientProperties();properties.setHost("127.0.0.1");properties.setPort(1883);properties.setUsername("emqx");properties.setPassword("laokou123");properties.setClientId("test-client-1");properties.setTopics(Map.of("/test-topic-1/#", 1));VertxMqttClient vertxMqttClient = new VertxMqttClient(vertx, properties, messageHandlers);Assertions.assertDoesNotThrow(vertxMqttClient::open);Thread.sleep(500);Assertions.assertDoesNotThrow(() -> vertxMqttClient.publish("/test-topic-1/test", 1, "test", false, false));Thread.sleep(500);Assertions.assertDoesNotThrow(vertxMqttClient::close);Thread.sleep(500);}}
详细代码请点击
非常推荐使用vertx-mqtt,项目平稳运行好用!!!
但是,需要时注意的是,项目部署到Linux系统,需要最少分配 -Xmx2100m -Xms2100m 内存,不然连接会关闭!
我是老寇,我们下次再见啦~
相关文章:
物联网之对接MQTT最佳实践
小伙伴们,你们好呀,我是老寇,跟我一起学习对接MQTT 安装EMQX 采用docker-compose一键式,启动!!! 还没有安装docker朋友,参考文章下面两篇文章 # Ubuntu20.04安装Docker # Cento…...
基于C++实现的深度学习(cnn/svm)分类器Demo
1. 项目简介 本项目是一个基于C实现的深度学习与传统机器学习结合的分类器Demo,主要流程为: 从CSV文件读取样本数据用卷积神经网络(CNN)进行特征提取用支持向量机(SVM)进行最终分类支持模型的保存与加载提…...
探寻适用工具:AI+3D 平台与工具的关键能力及选型考量 (AI+3D 产品经理笔记 S2E03)
引言:从技术光谱到落地选择的桥梁 在前两篇笔记中,我们首先(S2E01)宏观地探讨了 AI 生成 3D 技术兴起的驱动力、核心价值与面临的挑战,随后(S2E02)深入辨析了 Text-to-3D、Image-to-3D、NeRF 等…...
软考 系统架构设计师系列知识点 —— 黑盒测试与白盒测试(1)
本文内容参考: 黑盒测试和白盒测试详解-CSDN博客 软件测试中的各种覆盖(Coverage)详解-CSDN博客 特此致谢! 零、概述 黑盒测试又名为功能测试,主要目的是发现软件设计的需求或者是软件设计规格说明书中的错误缺陷。…...
配变运行检测:算法与实现逻辑
在现代电网系统中,配电变压器(简称配变)作为电力分配的关键设备,其运行状态的稳定与否直接关系到整个电网的供电质量和可靠性。配变运行检测通过实时监测和分析配变的各项运行参数,及时发现潜在故障隐患,为…...
brpc 安装及使用
介绍 brpc(Baidu Remote Procedure Call)是百度开源的一个高性能、通用的 RPC(远程过程调用)框架,其目标是让使用者能轻松构建高并发、分布式的应用程序。以下从多个方面详细介绍brpc: 核心特性 高性能 …...
ComfyUI学习笔记,案例四:inpaint
背景 ComfyUI学习笔记,案例四:inpaint,就是将一张图抠掉一块区域后还原,或者在一个图上重绘某个区域,感觉还是比较简单的。 它包含四个案例: inpaint_example,正向提示词 closeup photograph …...
【C++】智能指针RALL实现shared_ptr
个人主页 : zxctscl 专栏 【C】、 【C语言】、 【Linux】、 【数据结构】、 【算法】 如有转载请先通知 文章目录 1. 为什么需要智能指针?2. 内存泄漏2.1 什么是内存泄漏,内存泄漏的危害2.2 内存泄漏分类(了解)2.3 如何…...
利用迁移学习实现食物分类:基于PyTorch与ResNet18的实战案例
利用迁移学习实现食物分类:基于PyTorch与ResNet18的实战案例 在深度学习领域,训练一个高性能的模型往往需要大量的数据和计算资源。然而,通过迁移学习,我们能够巧妙地利用在大规模数据集上预训练好的模型,将其知识迁移…...
列日-巴斯通-列日:与VELO Senso TT+见证精彩时刻
近日,第111届列日-巴斯通-列日自行车赛落下帷幕,波加查毫无悬念地再度单飞夺冠。这场赛事不仅是速度与耐力的较量,更是装备与技术的完美结合。 在2025年第111届列日-巴斯通-列日自行车赛中,波加查以绝对优势再度单飞夺冠&a…...
C++笔记之委托
C++笔记之委托 code review! 文章目录 C++笔记之委托一、什么是委托?二、委托的常见应用场景2.1 事件委托(Event Delegation)2.2 C# 的委托类型(Delegate)2.3 对象组合中的委托(Design Delegation Pattern)三、C++ 委托模式示例四、委托的优点五、委托与23种设计模式的…...
Windows11 VS code 安装 Cline 调用 Github MCP 配置过程坑点汇总
背景 为了调研 MCP 在 windows 上如何使用本地的命令执行一些操作而实现自动化的过程,在 B 站视频的指导下,进行相应填坑过程,最终运行起来,并实现 github 自动化编程并提交代码的过程。 B 站 Cline 视频演示 Cline Cline 是一…...
SpringCloud多环境配置的一些问题
一、配置优先级(高到低) 命令行参数bootstrap.yaml/propertiesnacos配置config/applicaion.properties > config/applicaion.yml > config/applicaion.yamlapplicaion.properties > applicaion.yml > applicaion.yaml 有环境配置的会覆盖基础配置5的重复项&#…...
多语言笔记系列:Polyglot Notebooks 中运行 BenchmarkDotnet 基准测试
运行 BenchmarkDotnet 基准测试 在多语言笔记中,可以很方便的使用 BenchmarkDotnet 进行基准测试。 使用步骤 1. 安装 BenchmarkDotNet 包 // 默认包源 #i "nuget:https://api.nuget.org/v3/index.json"#r "nuget: BenchmarkDotNet, 0.13.12&quo…...
Model Context Protocol (MCP)笔记
目录 摘要MCP理论MCP的作用MCP 传输机制 Stdio 与 SSESTDIOSSE 传输部署模式 模型是如何确定工具的选用的?Fc x MCP x LangChain MCP快速开始编写客户端基于golang的mcp 摘要 Model Context Protocol(MCP,模型上下文协议)是由 An…...
【codeforces 2070c】二分答案详解
【codeforces 2070c】二分答案详解 二分答案转化成判定 对于任何问题,如果我们有了一个判定算法,那把解空间枚举并判定一遍,当然就可以得到解了。而当解空间具有单调性时,我们就可以使用二分法代替枚举。 考虑如下问题…...
启发式算法-禁忌搜索算法
禁忌搜索是一种可以用于解决组合优化问题的启发式算法,通过引入记忆机制跳出局部最优,避免重复搜索。该算法从一个初始解开始,通过邻域搜索策略来寻找当前解的邻域解,并在邻域解中选择一个最优解作为下一次迭代的当前解࿰…...
simulink 外循环与内循环执行流程
目录 前言 一、外循环 模型 执行流程 二、内循环 模型 执行流程 仓库 前言 某些需求需要使用到simulink外循环和内循环,本篇通过对其执行顺序进行记录,以便后续查阅。 一、外循环 模型 下面是我搭建的简单模型 执行流程 0-step:执行en step…...
Gradio全解20——Streaming:流式传输的多媒体应用(6)——构建视频流目标检测系统
Gradio全解20——Streaming:流式传输的多媒体应用(6)——构建视频流目标检测系统 本篇摘要20. Streaming:流式传输的多媒体应用20.6 RT-DETR模型构建视频流目标检测系统20.6.1 RT-DETR模型1. 模型介绍2. 使用示例 20.6.2 系统配置…...
比较两种判断相同二叉树的方法:递归与遍历序列对比
在二叉树操作中,判断两棵树是否相同是一个常见的问题。本文将对比两种不同的解决方案:递归法和遍历序列对比法,分析它们的优缺点,并探讨为何递归法是更优的选择。 问题描述 给定两棵二叉树的根节点 p 和 q,判断它们是…...
Java IO流核心处理方式详解
一、IO流概述 Java IO(Input/Output)流是处理输入输出操作的核心机制,通过流(Stream)的形式实现设备间的数据传输。所有操作都基于以下两个核心抽象: InputStream/OutputStream:字节流基类 Re…...
C++竞赛指南
关注支持,好运连连 目录 关注支持,好运连连 一、竞赛C核心优势 二、必备语法与STL组件 1. 输入输出优化 2. 常用STL容器 3. 算法函数 三、竞赛常用算法 1. 时间复杂度分析 2. 高频算法模板 二分查找 快速幂(模运算) …...
Python字符串全面指南:从基础到高级操作
字符串是Python编程中最基础也是最重要的数据类型之一。本文将全面介绍Python字符串的相关知识,从基础概念到高级操作,帮助您彻底掌握字符串的使用。 1. 字符串基础 1.1 字符串的概念 字符串是由一系列字符组成的不可变序列容器,存储的是字…...
【推荐】智慧矿山矿业信息化智能化资料汇总-共25份
智慧矿山矿业信息化智能化资料汇总 25 份: 有色金属矿山智能化采选生产线智能矿山建设与示范智能矿山建设实践与思考智慧矿山建设解决方案与实现途径以信息化、智能化为手段打造生态型、效益型国际一流示范矿山新型智能 X 荧光多通道高精度在线品位分析仪的研制与应…...
Oracle OCP认证考试考点详解083系列08
题记: 本系列主要讲解Oracle OCP认证考试考点(题目),适用于19C/21C,跟着学OCP考试必过。 36. 第36题: 题目 解析及答案: 关于数据库闪回(FLASHBACK DATABASE)功能,以下…...
备战蓝桥杯国赛第一天-atcoder-beginner-contest404
B. 因为只有四种情况,旋转90/180/270度后替换,直接替换,暴力即可 C. 循环图的定义是每个点出度为2,而且只有一个环的,所以先判断出度,再判断是否成环 #include <bits/stdc.h> using namespace st…...
Python异步编程进阶:深入探索asyncio高级特性
异步上下文管理器 (async with) 异步上下文管理器允许你在异步环境中管理资源,比如数据库连接或文件操作。 基本实现 class AsyncDatabaseConnection:async def __aenter__(self):print("建立数据库连接")await asyncio.sleep(0.5) # 模拟连接建立return selfas…...
【Java ee初阶】多线程(7)
一、线程池 线程池的一些参数: corePoolSize:核心线程数量 maximumPoolSize:核心线程数量临时线程数量 上述是“java 的线程池策略”(其他语言,其他库的线程池可能不同) keepAliveTime :临时线程的存活时间.临时线程…...
【PostgreSQL数据分析实战:从数据清洗到可视化全流程】6.2 预测分析基础(线性回归/逻辑回归实现)
👉 点击关注不迷路 👉 点击关注不迷路 👉 点击关注不迷路 文章大纲 PostgreSQL数据分析实战:预测分析基础(线性回归/逻辑回归实现)6.2 预测分析基础——线性回归与逻辑回归实现6.2.1 预测分析核心理论框架1…...
【NLP】29. 高效训练与替代模型:让语言模型更轻、更快、更强
高效训练与替代模型:让语言模型更轻、更快、更强 本文介绍语言模型如何通过结构优化与新模型探索,提升训练和推理的效率,适应资源受限环境,同时概述了一些 Transformer 替代模型的最新进展。 一、如何让语言模型更高效?…...
【LaTeX+VSCode本地Win11编译教程】
LaTeXVSCode本地编译教程参考视频: LaTeXVSCode本地编译教程 下面提供一种Win11的Latex环境配置和设置方案,首先vscode安装参考博客:【VscodeGit教程】,然后准备安装Latex相关组件 在 https://miktex.org/download 下载 miktex 并…...
组合两个表 --- MySQL [Leetcode 题目详解]
目录 题目链接 往期相关基础内容讲解博客 题目详解 1. 题目内容 2. 解题思路 3. 代码编写 题目链接 // 175. 组合两个表 往期相关基础内容讲解博客 // 聚合查询和联合查询博客 题目详解 1. 题目内容 // 编写解决方案,报告 Person 表中每个人的姓、名、城市…...
STM32 PulseSensor心跳传感器驱动代码
STM32CubeMX中准备工作: 1、设置AD 通道 2、设置一个定时器中断,间隔时间2ms,我这里采用的是定时器7 3、代码优化01 PulseSensor.c文件 #include "main.h" #include "PulseSensor/PulseSensor.h"/******************…...
macOS 上是否有类似 WinRAR 的压缩软件?
对于习惯使用 Windows 的用户来说,WinRAR 是经典的压缩/解压工具,但 macOS 系统原生并不支持 RAR 格式的解压,更无法直接使用 WinRAR。不过,macOS 平台上有许多功能相似甚至更强大的替代工具,以下是一些推荐࿱…...
Java求职面试:Spring Boot与微服务的幽默探讨
Java求职者面试:技术与幽默的碰撞 场景概述 在某互联网大厂的面试现场,面试官严肃认真,程序员则是一个搞笑的水货角色。面试者名叫张伟,年龄28岁,硕士学历,拥有5年的Java开发经验。以下是面试的详细过程。…...
《数字图像处理(面向新工科的电工电子信息基础课程系列教材)》封面颜色空间一图的选图历程
禹晶、肖创柏、廖庆敏《数字图像处理(面向新工科的电工电子信息基础课程系列教材)》 学图像处理的都知道,彩色图像的颜色空间很多,而且又是三维,不同的角度有不同的视觉效果,MATLAB的图又有有box和没有box。…...
Docker 使用下 (二)
Docker 使用下 (二) 文章目录 Docker 使用下 (二)前言一、初识Docker1.1 、Docker概述1.2 、Docker的历史1.3 、Docker解决了什么问题1.4 、Docker 的优点1.5 、Docker的架构图 二、镜像三、容器四、数据卷4.1、数据卷的概念4.2 、…...
【群晖NAS】Docker + WebStation + DDNS 部署无端口号HTTPs WordPress
前言 群晖提供官方的DDNS服务,可以直接配置一个类似于xxxx.synology.me的DDNS解析IPv4/IPv6到自己的NAS;群晖还有Web Station应用可以配置Docker的端口号映射,但是他自己占用了80端口,如果给自己的应用手动指定其他端口号&#x…...
手机SIM卡打电话时识别对方按下的DTMF按键(二)
手机SIM卡打电话时识别对方按下的DTMF按键(二) --本地AI电话机器人 前言 书接上篇,在上一篇章《手机打电话时如何识别对方按下的DTMF按键的字符》中,我们从理论的角度来论述了DTMF的频率组成。并尝试使用400Kb左右的【TarsosDS…...
N-Gram 模型
N-Gram 模型 什么是N-Gram?为什么叫 N-Gram?N-Gram怎么知道下一个词可能是什么?N-Gram 能做什么?N-Gram的问题 本文回答了四个问题: 一、N-Gram是什么?二、N-Gram为什么叫N-Gram?三、N-Gram具体…...
【漫话机器学习系列】240.真正类率(True Positive Rate,TPR)
理解真正类率(True Positive Rate,TPR):公式、意义与应用 在机器学习与深度学习模型评估中,"真正类率"(True Positive Rate,简称TPR)是一个非常重要的指标。TPR反映了分类…...
ThreadLocal源码深度剖析:内存管理与哈希机制
ThreadLocal是Java并发编程中的重要工具,它为每个线程提供独立的变量存储空间,实现了线程之间的数据隔离。本文将从源码实现角度,深入分析ThreadLocal的内部机制,特别是强弱引用关系、内存泄漏问题、ThreadLocalMap的扩容机制以及…...
Softmax回归与单层感知机对比
(1) 输出形式 Softmax回归 输出是一个概率分布,通过Softmax函数将线性得分转换为概率: 其中 KK 是类别数,模型同时计算所有类别的概率。 单层感知机 输出是二分类的硬决策(如0/1或1): 无概率解释&#x…...
数字社会学家唐兴通谈数字行动主义网络行动主义与标签行动主义,理解它才算抓住AI社会学与网络社会学关键所在
让我们继续探讨一个在数字时代至关重要的概念——数字行动主义(Digital Activism)、网络行动主义(Cyberactivism)以及标签行动主义(Hashtag Activism)。我将尽力从一个数字社会学家的角度,抽丝剥…...
PandasAI:对话式数据分析新时代
PandasAI:对话式数据分析新时代 引言项目概述分析基本信息 核心功能详解1. 自然语言查询处理2. 数据可视化生成3. 多数据源集成分析4. 安全沙箱执行5. 云平台协作功能 安装和使用教程1.环境要求2.安装步骤3.基本使用方法4.切换其他LLM 应用场景和实际价值1.适用业务…...
全球化电商平台AWS云架构设计
业务需求: 支撑全球三大区域(北美/欧洲/亚洲)用户访问,延迟<100ms处理每秒50,000订单的峰值流量混合云架构整合本地ERP系统全年可用性99.99%满足GDPR和PCI DSS合规要求 以下是一个体现AWS专家能力的全球化电商平台架构设计方…...
Linux 怎么使用局域网内电脑的网络访问外部
一次性 export http_proxy"http://192.168.0.188:7890" export https_proxy"http://192.168.0.188:7890"一直生效 写入 ~/.bashrc(或 ~/.bash_profile) nano ~/.bashrc加入这一行: export http_proxy"http://19…...
Python-numpy中ndarray对象创建,数据类型,基本属性
numpy库 numpy中的数据结构ndarrayndarray中的dtypendarray中的dtype的指定方式创建ndarray及指定dtype从列表创建ndarray使用 np.empty(), np.zeros(), np.ones() 和 np.full() 创建特定值的数组使用 np.arange() 创建等差数列数组使用 np.linspace() 创建等差数组使用np.logs…...
Python从入门到高手8.2节-元组的常用操作符
目录 8.2.1 元组的常用操作符 8.2.2 []操作符: 索引访问元组 8.2.3 [:]操作符:元组的切片 8.2.4 操作符:元组的加法 8.2.5 *操作符:元组的乘法 8.2.6 元组的关系运算 8.2.7 in操作符:查找元素 8.2.8 五一她玩了个狗吃…...
Python内置函数
Python作为一门简洁强大的编程语言,提供了丰富的内置函数(Built-in Functions),这些函数无需导入任何模块即可直接使用。本文将介绍Python中最常用、最重要的内置函数,帮助初学者快速掌握这些强大的工具。 官方地址&a…...