EMQX学习笔记
MQTT简介
MQTT是一种基于发布订阅模式的消息传输协议
消息:设备和设备之间传输的数据,或者服务和服务之间传输的数据
协议:传输数据时所遵循的规则
轻量级:MQTT协议占用的请求源较少,数据报文较小
可靠较强:多种消息的质量等级
安全性较强:提供传输层和套接层加密功能
双向通讯:客户端既可以发送数据,也可以从代理软件中获取数据
docker安装emqx 5.7
docker pull emqx/emqx:5.7
mkdir -p /data/docker/emqx/data /data/docker/emqx/log /data/docker/emqx/etc#手动复制默认配置文件到宿主机
docker run -d --name emqx_temp emqx/emqx:5.7
docker cp emqx_temp:/opt/emqx/etc /data/docker/emqx
docker stop emqx_temp && docker rm emqx_tempdocker run -d --name emqx \-u root \-p 1883:1883 -p 8083:8083 \-p 8084:8084 -p 8883:8883 \-p 18083:18083 \-v /data/docker/emqx/data:/opt/emqx/data \-v /data/docker/emqx/log:/opt/emqx/log \-v /data/docker/emqx/etc:/opt/emqx/etc \emqx/emqx:5.7
#遇到权限问题:mkdir: cannot create directory ‘/opt/emqx/data/configs’: Permission denied
#加了参数 -u root,使用root身份启动
#--privileged=true#如果忘记密码,可以进入docker容器修改
docker exec -it emqx /bin/bash
./bin/emqx_ctl admins passwd admin public123
EMQX后台管理页面
http://192.168.1.131:18083
admin/public
MQTTX客户端
在官网下载安装客户端 https://mqttx.app/zh
Docker安装MQTTX
docker pull emqx/mqttx-web:v1.10.1
docker run --rm --name mqttx-web -p 80:80 emqx/mqttx-web:v1.10.1
访问 http://192.168.1.131
如果是MQTTX客户端连接,使用 mqtt://192.168.1.131:1883
如果使用Docker安装MQTTX的,只能在网页中配置连接:ws://192.168.1.131:8083
wireshark网络监听工具
https://www.wireshark.org/download.html
下载并安装
打开Wireshark,监听网卡VMware Network Adapter VMnet8
在过滤器中输入mqtt,使用MQTTX桌面版本连接,可以查看报文的详细数据
QOS
消息的质量等级
0:消息最多发送一次
1:消息至少发送一次
2:消息仅有一次发送
在发送消息的时候可以指定消息的质量等级
QOS = 0:即发即弃,不需要等待确认,不需要存储和重传
QOS = 1:引用了应答和重传机制,在发送消息时缓存报文,报文前中包含Message Identifier,在应答ack中返回Message Identifier,删除缓存报文
QOS = 2:在接受端收到PUBREL消息之前,会缓存Packet ID,可以过滤重复消息
主题
对消息进行分类
不建议以 / 开头或结尾
单层通配符+ :必须占据整个层级 test/+ 或者 test/+/temperature
多层通配符# :必须是占据整个层级且是主题的最后一个字符 #或者test/#
系统主题#SYS/ :获取MQTT服务器自身运行状态、消息统计、客户端上下级事件等数据
参数配置
Clean Start:客户端在和服务器建立连接的时候尝试恢复之前的会话或者直接创建全新的会话
0:之前如果有连接,会尝试恢复之前的会话(可以接受到该客户端离线时,发布者后面发布的消息)
1:创建全新会话
Session Expiry Interval:决定会话状态数据在服务端的存储时长
0:会话在网络连接断开时立即结束
大于0:会话将在网络连接断开的多少秒之后过期
以上是MQTT会话为离线客户端缓存消息的能力
保留消息
普通消息:普通消息在发送前如果该主题不存在订阅者,MQTT服务器会直接将丢弃
保留消息:保留消息可以保留在MQTT服务器中,新的订阅者如果主题匹配,立接收到该消息
MQTT服务器会为每个主题存储最新一条保留消息
在保留消息发布前订阅主题,将不会收到保留消息(当普通消息接收)
使用场景
传感串上报数据间隔时间长,但订阅者需要在订阅后立即获取到新的数据
传感器的版本号、序列号等不会经常变更的属性
保留消息的删除
- 发送一条空的保留消息
- 在Dashboard页面删除
- 在发送保留消息时,设置保留消息的过期时间
发送消息时,可以指定消息过期时间
假如客户端意外离线,重新连接时如果消息已过期,则获取不到这条消息了(消息时效性:秒)
遗嘱消息
客户端可以在连接服务端中注册一个遗嘱消息,当该客户端意外断开连接,服务端就会向其他订阅了相应主题的客户端发送些遗嘱消息
Will Delay Interval:服务端将在网络连接关闭后延迟多久发布遗嘱消息(秒)
需要在连接前配置遗嘱消息
如果会话有效时间小于遗嘱延迟时间,则在会话结束前发送遗嘱消息
延迟发布
MQTT服务端收到发布者发布的消息以后,延迟一段时间以后再把消息转发给订阅者
延迟主题格式 ${delayed}/{DelayInterval}/{TopicName} 单位秒
Dashboard =》监控 =》延迟发布 =》设置,启用延迟发布
用户属性
用来发送一些自定义的内容
订阅选项
No local 服务端是否可以将消息转发给发布这个消息的客户端(默认值0:可以)
在桥接场景中,需要配置成1:不可以转发避免死循环
自动订阅:Dashboard =》MQTT高级特性 =》自动订阅,添加
黑名单:封禁某些客户端的访问
连接抖动检测:自动封禁那些被检测到短时间内频繁登录的客户端
共享订阅
相当于消费组,每个组消费同一个主题里所有的消息,一个组里面使用轮循等策略消费消息(并行消费及高可用性)
带群组的共享订阅:$share/<group-name>/{TopicName}
不带群组格式:$queue/{TopicName}
(相当于同一个消费组,组内并行消费)
排它订阅
一个主题同一时刻仅被允许存在一个订阅者 $exclusive/{TopicName}
使用docker compose创建Kafka测试环境
启动命令:docker compose -f docker-compose-kafka.yml up -d
services:zookeeper:image: wurstmeister/zookeeperhostname: zookeepercontainer_name: zookeeperports:- "2181:2181"environment:ZOO_MY_ID: 1ZOO_SERVERS: server.1=zookeeper:2888:3888;2181networks:- kafka-netkafka:image: wurstmeister/kafkahostname: kafkacontainer_name: kafkadepends_on:- zookeeperports:- "9092:9092"- "29092:29092"environment:KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.1.131:9092,PLAINTEXT_HOST://192.168.1.131:29092KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXTKAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,PLAINTEXT_HOST://0.0.0.0:29092KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"networks:- kafka-netkafka-eagle:image: nickzurich/efakcontainer_name: kafka-eagleports:- "8048:8048"environment:EFAK_DB: h2CLUSTER_ZK_LIST: zookeeper:2181/cluster1CLUSTER_KAFKA_BOOTSTRAP_SERVERS: kafka:9092depends_on:- kafkanetworks:- kafka-netnetworks:kafka-net:driver: bridgevolumes:kafka-data: {}zookeeper-data: {}
Kafka管理后台
http://192.168.1.131:8048
admin/123456
创建主题:Topics =》Create,Topic Name:test_mqtt_topic
Java代码发送kafka消息
@SpringBootTest
class MqttKafkaDemoApplicationTests {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;@Testpublic void testSendMsg() {kafkaTemplate.send("test_mqtt_topic", "hello,kafka");}
}
在kafka eagle的Topics =》List,点击进test_mqtt_topic里面,在右侧的Preview中,可以看到最近发送的消息
关闭服务 docker compose -f docker-compose-kafka.yml down
数据集成
使用Sink与Source与外部数据系统对接
Sink用于将消息从broker发送到外部数据系统
Source用于从外部系统接收消息
规则引擎:数据来源、数据处理过程、处理结果去向
数据集成示例
将客户端发往t/a主题中的消息输出到EMQX的控制台
#Dashboard =》集成 =》规则 =》创建,SQL编辑器
SELECT * FROM "t/a"
#数据输入为消息主题t/a
#动作输出选择控制台输出
SQL语法介绍
SELECT <字段名> FROM <主题> [WHERE <条件>]SELECT a,b FROM 't/#' //未知的列会返回undefined
SELECT * FROM '#' WHERE username = 'abc'
SELECT clientid as cid FROM '#' WHERE cid = 'abc'
SELECT clientid as cid FROM '#' WHERE username = 'abc'
SELECT clientid as cid, payload, topic, qos FROM "t/a"FOREACH <字段名> [DO <条件>] [INCASE <条件>] FROM <主题> [WHERE <条件>]
FOREACH 处理数组数据FOREACH payload.sensors as e
DO clientid, e.name as name, e.idx as idx
INCASE e.idx >= 1 #对DO选择出来的某个字段施加条件过滤
from "t/b"
t/b测试数据
{"date": "2025-04-15","sensors": [{"name": "a", "idx": 0},{"name": "b", "idx": 1},{"name": "c", "idx": 2}]
}
添加规则
FOREACH payload.sensors from "t/b"
添加动作类型:消息重发布
主题:sensors/${item.idx}
内容:${item.name}
添加3个订阅:sensors/1,sensors/2,sensors/3
使用DO简化输出结果
FOREACH payload.sensors as e
DO e.idx as idx, e.name as namefrom "t/c"
再次添加动作类型:消息重发布
主题:sensors/ i d x 内容: {idx} 内容: idx内容:{name}
CASE-WHEN语句
SELECT CASE WHEN payload.x < 0 THEN 0
WHEN payload.x > 7 THEN 7
ELSE payload.x
END as x
FROM "t/abc"
内置函数
SELECT abs(-1) as x,
concat(payload.msg, ' goods') as name
FROM "t/aaa"
Webhook
Webhook将EMQX客户端消息和事件发送到外部HTTP服务中
@RestController
@RequestMapping("webHook")
public class WebHookController {@PostMapping("notify")public void notifyMsg(@RequestBody Map<Object, Object> body) {System.out.println(body);}
}
在Dashboard =》集成 =》Webhook 中创建 Webhook
名称notify_WH_D,触发器:消息发布,过滤主题:t/1
http://192.168.0.199:8080/webHook/notify
docker查看挂载的磁盘信息
docker volume ls
docker volume inspect emqx_log
在Vue中使用MQTT
npm create vite@latest
npm install
npm install element-plus --save
npm install mqtt --save
修改main.js
import { createApp } from 'vue'
import './style.css'
import App from './App.vue'
import ElementPlus from 'element-plus'
import 'element-plus/dist/index.css'const app = createApp(App)
app.use(ElementPlus)
app.mount('#app')
添加页面MqttDemo.vue,连接emqx订阅发送消息
<script setup>
import { ref, reactive } from 'vue'
import mqtt from 'mqtt'const qosList = [0, 1, 2]
// 定义连接参数的对象
const connectInfo = ref({protocol: 'ws',host: '192.168.1.131',port: '8083',clientId: 'emqx_vue_client_' + Math.random().toString().substring(2, 8),username: 'zhangsan',password: '123456'
})const clientInitData = ref({connected: false
})
const client = ref({})
const createConnection = () => {const { protocol, host, port, ...options } = connectInfo.valueconst connectUrl = `${protocol}://${host}:${port}/mqtt`client.value = mqtt.connect(connectUrl, options)clientInitData.value.connected = trueconsole.log('连接建立成功了')
}const closeConnection = () => {client.value.end(false, () => {clientInitData.value.connected = falseconsole.log('连接关闭成功了')})}const subscriptionInfo = ref({topic: '',qos: 0
})const receivedMessages = ref({})
const subscriptionInitData = ref({subscription: false
})
const subscriptionTopicHandler = () => {const { topic, qos } = subscriptionInfo.valueclient.value.subscribe(topic, { qos }, (error, res) => {if (error) {console.log('主题订阅失败了', error)return}subscriptionInitData.value.subscription = true//给链接对象注册一个接收消息的事件client.value.on('message', (topic, message) => {console.log('接收到消息:', topic, message)receivedMessages.value = topic + '--->' + message})})
}const unSubscriptionTopicHandler = () => {const { topic, qos } = subscriptionInfo.valueclient.value.unsubscribe(topic, { qos }, (error, res) => {if (error) {console.log('主题取消订阅失败了', error)return}subscriptionInitData.value.subscription = false})
}const publishInfo = ref({topic: '',qos: 0,payload: ''
})const doPublish = () => {const { topic, qos, payload } = publishInfo.valueclient.value.publish(topic, payload, { qos }, (error, res) => {if (error) {console.log('发送消息失败了', error)return}})
}</script><template><el-card><h4>配置信息</h4><el-form label-width="120px"><el-row :gutter="20"><el-col :span="8"><el-form-item label="协议" prop="protocol"><el-select v-model="connectInfo.protocol"><el-option label="ws://" value="ws"></el-option><el-option label="wss://" value="wss"></el-option></el-select></el-form-item></el-col><el-col :span="8"><el-form-item label="主机地址" prop="host"><el-input v-model="connectInfo.host"></el-input></el-form-item></el-col><el-col :span="8"><el-form-item label="端口" prop="port"><el-input v-model="connectInfo.port"></el-input></el-form-item></el-col></el-row><el-row :gutter="20"><el-col :span="8"><el-form-item label="clientId" prop="clientId"><el-input v-model="connectInfo.clientId"></el-input></el-form-item></el-col><el-col :span="8"><el-form-item label="用户名" prop="username"><el-input v-model="connectInfo.username"></el-input></el-form-item></el-col><el-col :span="8"><el-form-item label="密码" prop="password"><el-input v-model="connectInfo.password"></el-input></el-form-item></el-col></el-row><el-row :gutter="20"><el-col :span="24"><el-button type="primary" :disabled="clientInitData.connected"@click="createConnection">建立连接</el-button><el-button type="danger" :disabled="!clientInitData.connected"@click="closeConnection">断开连接</el-button></el-col></el-row></el-form></el-card><el-card><h4>订阅主题</h4><el-form label-width="120px"><el-row :gutter="20"><el-col :span="8"><el-form-item label="Topic" prop="topic"><el-input v-model="subscriptionInfo.topic"></el-input></el-form-item></el-col><el-col :span="8"><el-form-item label="Qos" prop="qos"><el-select v-model="subscriptionInfo.qos"><el-option v-for="item in qosList" :label="item" :value="item"></el-option></el-select></el-form-item></el-col><el-col :span="8"><el-button type="primary" :disabled="subscriptionInitData.subscription"@click="subscriptionTopicHandler">订阅主题</el-button><el-button type="warning" :disabled="!subscriptionInitData.subscription"@click="unSubscriptionTopicHandler">取消订阅</el-button></el-col></el-row></el-form></el-card><el-card><h4>发布消息</h4><el-form label-width="120px"><el-row :gutter="20"><el-col :span="8"><el-form-item label="Topic" prop="topic"><el-input v-model="publishInfo.topic"></el-input></el-form-item></el-col><el-col :span="8"><el-form-item label="Qos" prop="field5"><el-select v-model="publishInfo.qos"><el-option v-for="item in qosList" :label="item" :value="item"></el-option></el-select></el-form-item></el-col><el-col :span="8"><el-form-item label="Payload" prop="payload"><el-input v-model="publishInfo.payload"></el-input></el-form-item></el-col></el-row><el-row :gutter="20"><el-col :span="24"><el-button type="primary" @click="doPublish">发布消息</el-button></el-col></el-row></el-form></el-card><el-card><h4>接收到的消息</h4><el-form><el-input v-model="receivedMessages" style="width: 98%" :rows="5" type="textarea" /></el-form></el-card></template><style scoped>
.el-row {margin-bottom: 20px;
}.el-col {border-radius: 4px;
}
</style>
Java中使用客户端连接EMQX
- 在pom.xml中引入依赖
<dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>1.2.2</version></dependency>
- 添加测试类
@SpringBootTest
class MqttSpringDemoApplicationTests {@Testvoid contextLoads() {}@Testpublic void testSendMsg() throws MqttException {String serverURI = "tcp://192.168.1.131:1883";String clientId = "paho_client_123";MemoryPersistence memoryPersistence = new MemoryPersistence();MqttClient mqttClient = new MqttClient(serverURI, clientId, memoryPersistence);MqttConnectOptions options = new MqttConnectOptions();options.setUserName("zhangsan");options.setPassword("123456".toCharArray());// 创建新的连接options.setCleanSession(true);mqttClient.connect(options);System.out.println("连接创建成功了");MqttMessage mqttMessage = new MqttMessage("hello mqtt".getBytes());mqttMessage.setQos(0);mqttClient.publish("a/c", mqttMessage);System.out.println("消息发送成功");mqttClient.disconnect();mqttClient.close();}@Testpublic void testReceiveMsg() throws MqttException {String serverURI = "tcp://192.168.1.131:1883";String clientId = "paho_client_123";MemoryPersistence memoryPersistence = new MemoryPersistence();MqttClient mqttClient = new MqttClient(serverURI, clientId, memoryPersistence);MqttConnectOptions options = new MqttConnectOptions();options.setUserName("zhangsan");options.setPassword("123456".toCharArray());// 创建新的连接options.setCleanSession(true);mqttClient.connect(options);System.out.println("连接创建成功了");mqttClient.subscribe("a/d", 2);mqttClient.setCallback(new MqttCallback() {@Overridepublic void connectionLost(Throwable throwable) {}@Overridepublic void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {System.out.println("接收到消息 " + topic + " " + new String(mqttMessage.getPayload()));}@Overridepublic void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {}});while (true);}@Testpublic void testCreateConnection() throws MqttException {String serverURI = "tcp://192.168.1.131:1883";String clientId = "paho_client_123";MemoryPersistence memoryPersistence = new MemoryPersistence();MqttClient mqttClient = new MqttClient(serverURI, clientId, memoryPersistence);MqttConnectOptions options = new MqttConnectOptions();options.setUserName("zhangsan");options.setPassword("123456".toCharArray());// 创建新的连接options.setCleanSession(true);mqttClient.connect(options);System.out.println("连接创建成功了");while (true);}
}
SpringBoot中通过SpringIntegration接入EMQX
在SpringBoot中通过SpringIntegration接入EMQX(MQTT消息服务器)可以更高效地管理数据流
利用SpringIntegration的通道(Channel)、适配器(Adapter)和消息处理器(MessageHandler)实现复杂的消息路由、转换和聚合。
关键组件说明
组件 | 作用 |
---|---|
MqttPahoMessageDrivenChannelAdapter | 入站适配器:订阅 EMQX 主题并将消息传递到输入通道。 |
MqttPahoMessageHandler | 出站适配器:从输出通道接收消息并发布到 EMQX 主题。 |
MessageChannel | 通道:连接适配器和处理器的管道,支持同步 / 异步模式。 |
@ServiceActivator | 服务激活器:处理通道中的消息并触发业务逻辑。 |
@MessagingGateway | 消息网关:提供接口简化消息发送到出站通道。 |
- pom.xml中引入依赖
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-integration</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId><version>5.4.3</version></dependency><dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-spring-boot3-starter</artifactId><version>3.5.10.1</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.30</version></dependency><dependency><groupId>com.alibaba.fastjson2</groupId><artifactId>fastjson2</artifactId><version>2.0.55</version></dependency><dependency><groupId>org.apache.httpcomponents</groupId><artifactId>httpclient</artifactId><version>4.5.13</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><scope>provided</scope></dependency></dependencies>
- 配置项目参数
修改application.properties
spring.application.name=spring-integration-mqtt
spring.mqtt.username=zhangsan
spring.mqtt.password=123456
spring.mqtt.url=tcp://192.168.1.131:1883
spring.mqtt.subClientId=sub_client_id_123
spring.mqtt.subTopic=iot/lamp/line,iot/lamp/device/status
spring.mqtt.pubClientId=pub_client_id_123
spring.mqtt.apiUrl=http://192.168.1.131:18083
spring.mqtt.secretKey=6l6rxxo2trs3QJjfK0OUhPIzjKlOqAOlnhniCuhonsI
spring.mqtt.apiKey=99d1ecc382aa4d58spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.url=jdbc:mysql://192.168.1.131:3306/lamp_test?useSSL=false&serverTimezone=Asia/Shanghai&characterEncoding=utf8
spring.datasource.username=root
spring.datasource.password=rootmybatis-plus.configuration.log-impl=org.apache.ibatis.logging.stdout.StdOutImpl
mybatis-plus.configuration.map-underscore-to-camel-case=true
mybatis-plus.mapper-locations=classpath*:/mapper/*Mapper.xmlhttpclient.max-total=200
httpclient.default-max-per-route=20
httpclient.connect-timeout=10000
httpclient.socket-timeout=10000
配置类加载数据
@Configuration
public class MqttConfig {@Autowiredprivate MqttConfigProperties mqttConfigProperties;@Beanpublic MqttPahoClientFactory mqttClientFactory() {DefaultMqttPahoClientFactory clientFactory = new DefaultMqttPahoClientFactory();MqttConnectOptions options = new MqttConnectOptions();options.setServerURIs(new String[]{mqttConfigProperties.getUrl()});options.setCleanSession(true);options.setUserName(mqttConfigProperties.getUsername());options.setPassword(mqttConfigProperties.getPassword().toCharArray());clientFactory.setConnectionOptions(options);return clientFactory;}
}
@Configuration
public class HttpClientConfig {@Value("${httpclient.max-total}")private int maxTotal;@Value("${httpclient.default-max-per-route}")private int maxPerRoute;@Value("${httpclient.connect-timeout}")private int connectTimeout;@Value("${httpclient.socket-timeout}")private int socketTimeout;@Beanpublic PoolingHttpClientConnectionManager poolingHttpClientConnectionManager() {PoolingHttpClientConnectionManager manager = new PoolingHttpClientConnectionManager();manager.setMaxTotal(maxTotal);manager.setDefaultMaxPerRoute(maxPerRoute);return manager;}@Beanpublic CloseableHttpClient httpClient(PoolingHttpClientConnectionManager manager) {RequestConfig requestConfig = RequestConfig.custom().setConnectTimeout(connectTimeout).setSocketTimeout(socketTimeout).build();return HttpClients.custom().setConnectionManager(manager).setDefaultRequestConfig(requestConfig).build();}
}
自动加载配置信息,创建文件
META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
#内容每行是一个类
com.achieve.mqtt.domain.MqttConfigProperties
- 入站适配器:订阅消息
@Configuration
public class MqttInboundConfig {@Autowiredprivate MqttConfigProperties mqttConfigProperties;@Autowiredprivate MqttPahoClientFactory mqttClientFactory;@Autowiredprivate ReceiverMessageHandler receiverMessageHandler;// 定义消息通道(订阅消息)@Beanpublic MessageChannel messageInboundChannel() {return new DirectChannel();}/*** 配置入站适配器,设置订阅主题,以及指定消息的相关属性*/@Beanpublic MessageProducer messageProducer() {MqttPahoMessageDrivenChannelAdapter mqttPahoMessageDrivenChannelAdapter =new MqttPahoMessageDrivenChannelAdapter(mqttConfigProperties.getUrl(),mqttConfigProperties.getSubClientId(),mqttClientFactory,mqttConfigProperties.getSubTopic().split(","));mqttPahoMessageDrivenChannelAdapter.setQos(1);mqttPahoMessageDrivenChannelAdapter.setConverter(new DefaultPahoMessageConverter());mqttPahoMessageDrivenChannelAdapter.setOutputChannel(messageInboundChannel());return mqttPahoMessageDrivenChannelAdapter;}// 服务激活器:处理输入通道消息@Bean@ServiceActivator(inputChannel = "messageInboundChannel")public MessageHandler messageHandler() {return receiverMessageHandler;}
}
@Component
public class ReceiverMessageHandler implements MessageHandler {@Overridepublic void handleMessage(Message<?> message) throws MessagingException {String payload = message.getPayload().toString();MessageHeaders headers = message.getHeaders();Object topicName = headers.get("mqtt_receivedTopic").toString();System.out.println(payload);System.out.println(message);System.out.println(topicName);}
}
- 从输出通道接收消息并发布到EMQX主题
@Configuration
public class MqttOutboundConfig {@Autowiredprivate MqttConfigProperties mqttConfigProperties;@Autowiredprivate MqttPahoClientFactory mqttClientFactory;// 定义消息通道(发布消息)@Beanpublic MessageChannel messageOutboundChannel() {return new DirectChannel();}@Bean@ServiceActivator(inputChannel = "messageOutboundChannel")public MessageHandler mqttOutboundMessageHandler() {MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(mqttConfigProperties.getUrl(),mqttConfigProperties.getPubClientId(),mqttClientFactory);messageHandler.setAsync(true);messageHandler.setDefaultTopic("default");messageHandler.setDefaultQos(0);return messageHandler;}
}
- 消息网关:提供接口简化消息发送到出站通道
//网关接口发送消息(用于发送消息到出站通道)
@MessagingGateway(defaultRequestChannel = "messageOutboundChannel")
public interface MqttGateway {void sendMsgToMqtt(@Header(value = MqttHeaders.TOPIC) String topic, String payload);void sendMsgToMqtt(@Header(value = MqttHeaders.TOPIC) String topic, @Header(value = MqttHeaders.QOS) int qos, String payload);
}
@Component
public class MqttMessageSender {@Autowiredprivate MqttGateway mqttGateway;public void sendMsg(String topic, String payload) {mqttGateway.sendMsgToMqtt(topic, payload);}public void sendMsg(String topic, int qos, String payload) {mqttGateway.sendMsgToMqtt(topic, qos, payload);}
}
- 测试发送消息
@SpringBootTest
class SpringIntegrationMqttApplicationTests {@Autowiredprivate MqttMessageSender mqttMessageSender;@Testpublic void testSendMsg() {mqttMessageSender.sendMsg("a/e", "hello world");}
}
- 查询设备在线状态
@RestController
@RequestMapping("/api/lamp")
@Slf4j
public class LampApiController {@Autowiredprivate MqttMessageSender mqttMessageSender;@Autowiredprivate MqttConfigProperties mqttConfigProperties;@Autowiredprivate CloseableHttpClient httpClient;@GetMapping(value = "{deviceId}/{status}")public String sendStatusLampMsg(@PathVariable String deviceId, @PathVariable Integer status) {Map<String, Object> map = Map.of("deviceId", deviceId, "status", status);String json = JSON.toJSONString(map);mqttMessageSender.sendMsg("iot/lamp/server/line", json);return "ok";}/*** 查询设备是否在线,v4和v5版本返回值不同;认证使用Basic方式* @param clientId* @return*/@GetMapping(value = "online/{clientId}")public boolean isClientOnline(@PathVariable String clientId) {String apiUrl = mqttConfigProperties.getApiUrl() + "/api/v5/clients/" + clientId;HttpGet httpGet = new HttpGet(apiUrl);httpGet.setHeader(HttpHeaders.ACCEPT, "application/json");String auth = Base64.getEncoder().encodeToString((mqttConfigProperties.getApiKey() + ":" + mqttConfigProperties.getSecretKey()).getBytes());httpGet.setHeader("Authorization", "Basic " + auth);try (CloseableHttpResponse response = httpClient.execute(httpGet)) {int statusCode = response.getStatusLine().getStatusCode();HttpEntity entity = response.getEntity();if (statusCode == HttpStatus.SC_OK && entity != null) {String json = EntityUtils.toString(entity, StandardCharsets.UTF_8);log.info("isClientOnline data {}", json);JSONObject jsonObject = JSON.parseObject(json);return jsonObject.getBoolean("connected");} else if (statusCode == HttpStatus.SC_NOT_FOUND) {return false; // 设备不存在} else {throw new RuntimeException("EMQX API 请求失败: HTTP " + statusCode);}} catch (IOException e) {throw new RuntimeException("网络通信异常", e);}}
}
测试地址及数据
http://localhost:8080/api/lamp/online/device-123
#上线
iot/lamp/line
{"deviceId": "device-123456","status": 1
}#服务器下发指令 http://localhost:8080/api/lamp/device-123456/1
iot/lamp/server/line
{"deviceId": "device-123456","status": 1
}#上报状态
iot/lamp/device/status
{"deviceId": "device-123456","status": 1
}
源代码地址
https://gitee.com/galen.zhang/mqtt-demo
相关文章:
EMQX学习笔记
MQTT简介 MQTT是一种基于发布订阅模式的消息传输协议 消息:设备和设备之间传输的数据,或者服务和服务之间传输的数据 协议:传输数据时所遵循的规则 轻量级:MQTT协议占用的请求源较少,数据报文较小 可靠较强ÿ…...
组件是怎样写的(1):虚拟列表-VirtualList
本篇文章是《组件是怎样写的》系列文章的第一篇,该系列文章主要说一下各组件实现的具体逻辑,组件种类取自 element-plus 和 antd 组件库。 每个组件都会有 vue 和 react 两种实现方式,可以点击 https://hhk-png.github.io/components-show/ …...
CGAL 计算直线之间的距离(3D)
文章目录 一、简介二、实现代码三、实现效果一、简介 这里的计算思路很简单: 1、首先将两个三维直线均平移至过原点处,这里两条直线可以构成一个平面normal。 2、如果两个直线平行,那么两条直线之间的距离就转换为直线上一点到另一直线的距离。 3、如果两个直线不平行,则可…...
定期检查滚珠丝杆的频率是多久?
定期检查滚珠丝杆的频率通常是每半年进行一次,根据不同的使用环境和设备类型,滚珠丝杆的检查周期有所不同。接下来我们一起看看滚珠丝杆的维护保养方法: 1、清洗:每隔一段时间对滚珠丝杆进行清洁,将滚珠丝杆拆…...
Spark-SQL连接Hive全攻略
在大数据处理领域,Spark-SQL与Hive的结合能发挥强大的功能。今天就来给大家分享一下Spark-SQL连接Hive的多种方式。 Spark SQL编译时可选择包含Hive支持,这样就能使用Hive表访问、UDF、HQL等特性,而且无需提前安装Hive。其连接方式丰富多样…...
在Ubuntu 18.04下编译OpenJDK 11
在Ubuntu 18.04下编译OpenJDK 11 源码下载地址: 链接: https://pan.baidu.com/s/1QAdu-B6n9KqeBakGlpBS3Q 密码: 8lho Linux下的环境要求 不同版本的jdk会要求在不同版本的Ubuntu下编译,不要用太高版本的Ubuntu或者gcc,特别是gcc…...
Spring MVC 一个简单的多文件上传
原始代码逐行解释 PostMapping("/uploads") // ① 声明处理POST请求,路径为"/uploads" ResponseBody // ② 直接返回数据到响应体,不进行视图解析 public String uploads(MultipartFile[] files, // …...
FreeRTos学习记录--1.工程创建与源码概述
1.工程创建与源码概述 1.1 工程创建 使用STM32CubeMX,可以手工添加任务、队列、信号量、互斥锁、定时器等等。但是本课程不想严重依赖STM32CubeMX,所以不会使用STM32CubeMX来添加这些对象,而是手写代码来使用这些对象。 使用STM32CubeMX时&…...
Vmware esxi 给现有磁盘增加空间后并扩展系统里磁盘空间
当前EXSI上虚拟机所在的单独数据磁盘空间满了,需要对空间进行扩容,我们先在主机对磁盘容量进行调整,然后在系统里面对磁盘空间进行拓展,这些操作需要保留数据并且不改变现有的磁盘格局。 遵循大致操作流程是: 1.先登录…...
Linux基础学习--linux的文件权限与目录配置
linux的文件权限与目录配置 1.用户与用户组 在Linux中,每个文件都有相当多的属性和权限,其中最重要的概念就是文件的拥有者。 1.1 文件拥有者 Linux是一个多人多任务的系统,常常有多人共用一台主机的情况出现,因此在系统中可以…...
LLM大模型中的基础数学工具—— 约束优化
Q26: 推导拉格朗日乘子法 的 KKT 条件 拉格朗日乘子法与 KKT 条件是啥? 拉格朗日乘子法是解决约束优化问题的利器。比如,想最小化函数 ,同时满足约束 ,就构造拉格朗日函数 ( 是乘子)。KKT 条件是解这类问…...
涨薪技术|0到1学会性能测试第20课-关联技术
前面的推文我们掌握了性能测试脚本开发参数化技术一系列知识,今天开始给大家分享关联技术知识,后续文章都会系统分享干货,带大家从0到1学会性能测试! 关联是LoadRunner中一个很重要的应用,对于初学者来说也是最容易犯错的地方,但是很遗憾的是,并没有任何特定的错误与关联…...
SpringAI入门示例
AI编程简介 纯Prompt模式 纯Prompt模式是AI编程中最基础的交互架构。用户通过输入自然语言文本(即Prompt)向AI模型发出指令,模型依据自身预训练所积累的知识和语言理解能力,直接生成相应的文本响应。其工作原理是,用…...
SQL 中 ROLLUP 的使用方法
ROLLUP 是 SQL 中一种分组操作,它生成多个分组集的小计行和总计行,提供层次化的汇总数据。 基本语法 SELECT column1, column2, ..., aggregate_function(column) FROM table GROUP BY ROLLUP (column1, column2, ...); 使用示例 假设有一个销售表 sal…...
Web前端:Overflow属性(超出裁剪属性)
一、什么是 Overflow? 在网页布局中,容器(如 <div>、<section> 等)通常有固定尺寸(如 width 和 height)。当容器内的内容(文本、图片等)超出容器边界时,就会…...
20250421在荣品的PRO-RK3566开发板的Android13下使用io命令控制GPIO
20250421在荣品的PRO-RK3566开发板的Android13下使用io命令控制GPIO 2025/4/21 10:44 【本文只打开了io命令。通过io控制GPIO放到下一篇了】 缘起:需要在荣品的PRO-RK3566开发板的Android13的u-boot中来控制GPIO3A1【配置以太网RTL8211F-CG】。 直接使用GPIO库函数 …...
20250421在荣品的PRO-RK3566开发板的Android13下频繁重启RKNPU fde40000.npu: Adding to iommu gr
20250421在荣品的PRO-RK3566开发板的Android13下频繁重启RKNPU fde40000.npu: Adding to iommu gr 2025/4/21 14:50 缘起:电池没电了,导致荣品的PRO-RK3566的核心板频繁重启。 内核时间4s就重启。100%复现。 PRO-RK3566 Android13启动到这里 复位&#…...
在 8MHz 的时钟电路中挂接电阻,电容
匹配电阻:在晶体振荡电路中,用于匹配晶体和振荡电路的阻抗,确保振荡的稳定性,阻值通常在几十千欧到几百千欧,例如 1MΩ、33KΩ、47KΩ 等。 在一些电子电路中,尤其是涉及到时钟信号的产生和传输时…...
卸载工具:IObit Uninstaller Pro v14.3.0 中文绿色专业便携版
IObit Uninstaller 是一种功能强大的卸载工具,可帮助您快速方便地从计算机中移除不需要的程序和文件夹。它不仅仅可以从计算机中卸载应用程序,还可以移除它们的卸载残留。可以检测和分类所有已安装的程序,并可以批量卸载,只需一键…...
【目标检测】目标检测综述 目标检测技巧
I. 目标检测中标注的关键作用 A. 目标检测数据标注的定义 目标检测是计算机视觉领域的一项基础且核心的任务,其目标是在图像或视频中准确识别并定位出预定义类别的目标实例 1。数据标注,在目标检测的语境下,指的是为原始视觉数据࿰…...
c++基础·move作用,原理
目录 一、代码结构概览 二、逐层解析实现逻辑 1. 模板参数推导 2. 返回类型设计 3. 类型转换逻辑 三、关键特性与设计思想 1. 移动语义的本质 2. 为何必须用 remove_reference 3. 万能引用的兼容性 四、边界场景与注意事项 1. 对 const 对象的处理 2. 返回值优化&a…...
考研系列-计算机网络-第四章、网络层
一、网络层的概述和功能 1.功能概述 2.SDN的基本概念...
服务器在国外国内用户访问慢会影响谷歌排名吗?
谷歌明确将“页面加载速度”和“用户体验”作为排名核心指标,但当服务器物理距离过远时,国内用户动辄3秒以上的加载延迟,可能导致跳出率飙升、爬虫抓取困难等连锁反应。 但盲目将服务器迁回国内,又会面临备案成本、运维门槛等新难…...
iFable,AI角色扮演互动平台,自动生成沉浸式故事游戏
iFable是什么 iFable 是一个以动漫角色为主题的互动角色扮演游戏平台,旨在为用户提供沉浸式的故事冒险体验。平台允许玩家通过简单的创意输入,利用AI技术生成独特的互动故事与游戏体验。iFable 的设计宗旨在于帮助玩家与虚拟角色建立情感连接࿰…...
Nginx反向代理用自定义Header参数
【啰嗦两句】 也不知道为啥,我仅仅想在Nginx的反向代理中使用自己定义的“x-api-key”做Header参数,却发现会被忽略,网上搜的资料都是说用“proxy_set_header”,却只愿意介绍最基本的几个参数,你懂的,那些资…...
Spark SQL概述(专业解释+生活化比喻)
专业解释 一、什么是Spark SQL? 一句话定义: Spark SQL是Apache Spark中专门处理结构化数据的模块,可以让你像操作数据库表一样处理数据,支持用SQL查询或编程API(DataFrame/DataSet)分析数据。 通俗理解…...
LX3-初识是单片机
初识单片机 一 什么是单片机 单片机:单片微型计算机单片机的组成:CPU,RAM(内存),flash(硬盘),总线,时钟,外设…… 二 Coretex-M系列介绍 了解ARM公司与ST公司ARM内核系列: A 高性能应用,如手机,电脑…R 实时性强,如汽车电子,军工…M 超低功耗,如消费电子,家电,医疗器械 三…...
第二章 Logback的架构(一)
Logback的架构 Logback作为一个通用框架,可以应对不同场景的日志记录。目前,Logback 被划分为三个模块:logback-core、logback-classic 和 logback-access。 Logback的core模块为其他两个模块提供基础支持。classic模块扩展了core模块&…...
开发指南:构建结合数字孪生、大语言模型与知识图谱的智能设备日志分析及生产异常预警系统
1. 引言:数字孪生、大语言模型与知识图谱在智能制造中的融合 智能制造和工业4.0的浪潮正在重塑全球制造业格局,其核心在于利用先进的数字技术实现生产过程的实时决策、效率提升、灵活性增强和敏捷性改进。在这一转型过程中,数字孪生…...
【TeamFlow】4.1 Git使用指南
以下是 Git 在 Windows 系统上的配置和使用指南,包含详细步骤和注意事项: 安装 Git for Windows 下载与安装 前往 Git 官网 下载 Windows 版安装包 双击安装,关键选项建议: 选择 Use Git from Git Bash only(推荐&…...
HADOOP 3.4.1安装和搭建(尚硅谷版~)
目录 1.配置模版虚拟机 2.克隆虚拟机 3.在hadoop102安装JDK 4.完全分布式运行模式 1.配置模版虚拟机 1.安装模板虚拟机,IP地址192.168.10.100、主机名称hadoop100、内存2G、硬盘20G(有需求的可以配置4G内存,50G硬盘) 2.hado…...
通过Docker Desktop配置OpenGauss数据库的方法(详细版+图文结合)
文章目录 通过Docker Desktop配置OpenGauss数据库的方法**一、下载Docker Desktop,并完成安装**docker官网:https://www.docker.com/ **二、下载OpenGauss压缩包**安装包下载链接:https://opengauss.obs.cn-south-1.myhuaweicloud.com/7.0.0-…...
文件有几十个T,需要做rag,用ragFlow能否快速落地呢?
一、RAGFlow的优势 1、RAGFlow处理大规模数据性能: (1)、RAGFlow支持分布式索引构建,采用分片技术,能够处理TB级数据。 (2)、它结合向量搜索和关键词搜索,提高检索效率。 …...
SystemVerilog语法之内建数据类型
简介:SystemVerilog引进了一些新的数据类型,具有以下的优点:(1)双状态数据类型,更好的性能,更低的内存消耗;(2)队列、动态和关联数组,减少内存消耗…...
TensorFlow和PyTorch学习原理解析
这里写目录标题 TensorFlow和PyTorch学习&原理解析TensorFlow介绍原理部署适用场景 PyTorch介绍原理部署适用场景 Keras模型格式SavedModelONNX格式 TensorFlow和PyTorch学习&原理解析 TensorFlow 介绍 由 Google Brain 团队开发并于 2015 年开源。由于 Google 的强…...
悬空引用和之道、之禅-《分析模式》漫谈57
DDD领域驱动设计批评文集 做强化自测题获得“软件方法建模师”称号 《软件方法》各章合集 “Analysis Patterns”的第5章“对象引用”原文: Unless you can catch all such references, there is the risk of a dangling reference, which often has painful con…...
江湖密码术:Rust中的 bcrypt 加密秘籍
前言 江湖险恶,黑客如雨,昔日密码“123456”早被各路大侠怒斥为“纸糊轻功”。若还执迷不悟,用明文密码闯荡江湖,无异于身披藏宝图在集市上狂奔,目标大到闪瞎黑客双眼。 为护你安然度过每一场数据风波,特献上一门绝学《Rust加密神功》。核心招式正是传说中的 bcrypt 密…...
NLP高频面试题(四十八)大语言模型中的思维链(CoT)技术详解
引言 大语言模型(LLM)在近年的飞速发展,让机器在各种任务上表现出令人瞩目的能力。然而,与人类不同,传统的语言模型往往倾向于直接给出答案,而缺乏可解释的中间推理过程。这在复杂推理任务中成为瓶颈:模型可能由于一步推理不当而得出错误结论,却没有过程可供检查。为了…...
对接点餐接口需要有哪些准备?
以下是一般点餐接口对接的相关信息,包括常见的接口功能、对接步骤及注意事项等: 常见接口功能 餐厅信息查询:获取合作餐厅的基本信息,如餐厅名称、地址、营业时间、联系电话、菜单等。菜品查询:查询具体餐厅的菜品详情…...
LintCode第192题-通配符匹配
描述 给定一个字符串 s 和一个字符模式 p ,实现一个支持 ? 和 * 的通配符匹配。匹配规则如下: ? 可以匹配任何单个字符。* 可以匹配任意字符串(包括空字符串)。 两个串完全匹配才算匹配成功。 样例 样例1 输入: "aa&q…...
uv运行一个MCP Server的完整流程
uv是一个高性能的Python包管理器,专注于性能提升。与pip相比,uv利用全局模块缓存,减少磁盘空间使用,并支持Linux、Windows和macOS系统。安装uv可以通过多种方式实现,例如使用Homebrew、Pacman、pip等。 step 1 安装uv:…...
ts中的类型
在 TypeScript 中,类型是静态类型系统的核心,用于在编译阶段检查代码的正确性。TypeScript 提供了丰富的类型系统,包括基本的原始类型、复合类型、以及用户自定义的类型。以下是对 TypeScript 中各种类型的详细分类和说明: 1. 原…...
把dll模块注入到游戏进程的方法_基于文件修改的注入方式
1、概述 本文主要是介绍两种基于文件修改的注入方式,一种是“DLL劫持”,另一种是“修改导入表”。这两种注入方式都是利用操作系统加载PE时的特点来实现的,我们在实现这两种注入方式时只需专注于注入dll的实现,而不用花费额外的精力去关注注入器的实现。要想深入了解这两种…...
判断点是否在多边形内
代码段解析: const intersect = ((yi > y) !== (yj > y)) && (x < (xj - xi) * (y - yi) / (yj - yi) + xi); 第一部分:(yi > y) !== (yj > y) 作用:检查点 (x,y) 的垂直位置是否跨越多边形的当前边。 yi > y 和 yj > y 分别检查边的两个端…...
【形式化验证基础】活跃属性Liveness Property和安全性质(Safety Property)介绍
文章目录 一、Liveness Property1、概念介绍2、形式化定义二、Safety Property1. 定义回顾2. 核心概念解析3. 为什么强调“有限前缀”4. 示例说明4.1 示例1:交通信号灯系统4.2 示例2:银行账户管理系统5. 实际应用的意义三. 总结一、Liveness Property 1、概念介绍 在系统的…...
Linux——信号(2)信号保存与捕捉
一、信号的保存 上次我们说到,捕捉一个信号后有三种处理方式:默认、忽略、自定义,其中自定义我们用signal系统调用完成,至于忽略信号,也需要signal实现,比如我现在想忽略2号信号,则:…...
Vue的模板编译过程
👨 作者简介:大家好,我是Taro,全栈领域创作者 ✒️ 个人主页:唐璜Taro 🚀 支持我:点赞👍📝 评论 ⭐️收藏 文章目录 前言一、编程范式的分类1.编程范式分为声明式和命令…...
空间应用中心AI4S空间科学实验研究成果发表于《中国科学院院刊》
编者寄语: 和鲸基于旗下数据科学协同平台ModelWhale赋能,助力了中国科学院空间应用工程与技术中心系统开展了基于空间科学实验领域的AI4S创新研究。中国科学院空间应用工程与技术中心在空间科学实验领域的研究覆盖了多模态空间科学实验数据模式挖掘、领…...
【Python网络爬虫开发】从基础到实战的完整指南
目录 前言:技术背景与价值当前技术痛点解决方案概述目标读者说明 一、技术原理剖析核心概念图解核心作用讲解关键技术模块技术选型对比 二、实战演示环境配置要求核心代码实现(10个案例)案例1:基础静态页面抓取案例2:动…...
乐家桌面纯净版刷机ROM下载 乐家桌面纯净版2025官方最新下载
还在苦苦寻找一款好用的电视桌面,为智能电视焕新体验?别在乐家桌面纯净版刷机 ROM 下载和官方最新版下载上纠结啦,试试乐看家桌面,给你带来意想不到的惊喜! 乐家桌面纯净版或许曾吸引过你,但乐看家桌面在众…...