【开发】中间件——RocketMQ
分布式消息系统
- RocketMQ概念,用途,特性
- 安装RocketMQ
- 掌握RocketMQ的api使用
- 对producer、consumer进行详解
- 了解RocketMQ的存储特点
- 简介及相关概念
- JavaAPI
- SpringBoot整合RocketMQ
- 消息的顺序收发
- 消息系统的事务、存储、重试策略
- 消息系统的集群
RocketMQ
RocketMQ简介
采用java开发的分布式消息系统,由阿里开发
地址:http://rocketmq.apache.org/
历史发展
- 阿里中间件,Notify用于交易核心信息的流转
- 2010年,B2B开始大规模使用ActiveMQ作为消息内核,急需支持顺序消息、拥有海量消息堆积能力的消息中间件——MetaQ 1.0 2011诞生
- 2012 年MetaQ发展到了3.0版本,抽象除了通用的消息引擎RorcketMQ
- 2015年,RocketMQ进过双十一,在可用性,可靠性和稳定性等方面都有出色表现。阿里消息中间件基于RocketMQ退出Aliware MQ1.0,开始为阿里云上的企业提供消息服务
- 2016年,RocketMQ进入Apache孵化
概念
-
Producer
- 消息生产者:生产消息,一般由业务系统负责产生消息
- Producer Group:一类Producer的集合名称,这类Producer通常发送同一类消息,且发送逻辑一致
-
Consumer
-
消费者:负责消费消息,一般由后台系统负责异步消费
-
分类:
- Push Consumer:消费端被动接收由服务端Push的消息
- Pull Consumer:消费端主动向服务端定时拉取消息
-
Consmer Group:一类Consumer的集合名称,这类Producer通常发送同一类消息,且发送逻辑一致
-
-
Broker
- RocketMQ的核心消息的发送、接收、高可用等
- 需要定时发送自身情况到NameServer,默认10s发送一次,超过2分钟会认为该broker失效
-
NameServer
- 集群中的组织协调员
- 收集broker的工作情况
- 不负责消息的处理
-
Topic【逻辑概念】
- 不同类型的消息以不同的Topic名称进行区分,如User、Order等
- Message Queue
- 消息队列,用于存储消息
下载部署
非docker
下载地址:https://archive.apache.org/dist/rocketmq/4.3.2/rocketmq-all-4.3.2-bin-release.zip
cd /opt
unzip rocketmq-all-4.3.2-bin-release.zip
cd rocketmq-all-4.3.2-bin-release/# 启动nameserver
bin/mqnamesrv
#The Name Server boot success. serializeType=JSON
# 看到这个说明nameserver启动成功#启动broker
bin/mqbroker -n 8.140.130.91:9876 #-n指定nameserver地址和端口
Java HotSpot(TM) 64-Bit Server VM warning: INFO: os::commit_memory(0x00000005c0000000, 8589934592, 0) failed; error='Cannot allocate memory' (errno=12)
启动错误,因为RocketMQ的配置默认是生产环境的配置,设置jvm的内存值比较大,需要调整默认值
#调整默认的内存大小参数
cd bin/
JAVA_OPT="${JAVA_OPT} -server -Xms128m -Xmx128m -Xmn128m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=128m"vim runbroker.sh
JAVA_OPT="${JAVA_OPT} -server -Xms128m -Xmx128m -Xmn128m"#重新启动测试
bin/mqbroker -n 8.140.130.91:9876
The broker[iZ2zeg4pktzjhp9h7wt6doZ, 172.17.0.1:10911] boot success. serializeType=JSON and name server is 8.140.130.91:9876#启动成功
发送消息测试:
export NAMESRV_ADDR=127.0.0.1:9876
cd /opt/rocketmq-all-4.3.2-bin-release/bin
sh tools.sh org.apache.rocketmq.example.quickstart.Producer
接收消息测试:
sh tools.sh org.apache.rocketmq.example.quickstart.Consumer
java api测试
依赖
<dependencies><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.3.2</version></dependency>
</dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.2</version><configuration><source>1.8</source><target>1.8</target><encoding>UTF-8</encoding></configuration></plugin></plugins>
</build>
测试代码
package com.rocketmq;import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;import java.io.UnsupportedEncodingException;public class SyncProducer {public static void main(String[] args) throws Exception {//Instantiate with a producer group name.DefaultMQProducer producer = new DefaultMQProducer("test-group");//specify name server addressproducer.setNamesrvAddr("8.140.130.91:9876");//Lanuch the instanceproducer.start();for (int i = 0; i < 100; i++) {//create message instance ,specify topic,tag and message bodyMessage msg = new Message("TopicTest1",/*topic*/"TAGA",/*tag*/("Hello RocketMQ" + i).getBytes(RemotingHelper.DEFAULT_CHARSET)/*message body*/);//Call send message to deliver message to one of brokers.SendResult sendResult = producer.send(msg);System.out.printf("%s%n", sendResult);}//Shut down once the producer instance is not longer in use.producer.shutdown();}
}
发现报错
原因:
broker的ip地址是172.17.0.1,为私有ip,所以不可访问
解决:修改broker配置文件,指定broker 的ip地址
cd /opt/rocketmq-all-4.3.2-bin-release/conf
vim broker.confbrokerIP1=8.140.130.91
namesrvAddr=8.140.130.91:9876
brokerName=broker_haoke_im#启动broker,通过 -c 指定配置文件
cd /opt/rocketmq-all-4.3.2-bin-release/
bin/mqbroker -c /opt/rocketmq-all-4.3.2-bin-release/conf/broker.conf
API测试成功
通过docker部署
#拉取镜像
docker pull foxiswho/rocketmq:server-4.3.2
docker pull foxiswho/rocketmq:broker-4.3.2#创建nameserver容器
docker create -p 9876:9876 --name rmqserver \
-e "JAVA_OPT_EXT=-server -Xms128m -Xmx128m -Xmn128m" \
-e "JAVA_OPTS=-Duser.home=/opt" \
-v /data/rmq-data/rmqserver/logs:/opt/logs \
-v /data/rmq-data/rmqserver/store:/opt/store \
foxiswho/rocketmq:server-4.3.2#创建broker容器
#10911 生产者,消费者端口
#10909 搭建集群主从端口
docker create -p 10911:10911 -p 10909:10909 --name rmqbroker \
-e "JAVA_OPTS=-Duser.home=/opt" \
-e "JAVA_OPT_EXT=-server -Xms128m -Xmx128m -Xmn128m" \
-v /data/rmq-data/rmqbroker/conf/broker.conf:/etc/rocketmq/broker.conf \
-v /data/rmq-data/rmqbroker/logs:/opt/logs \
-v /data/rmq-data/rmqbroker/store:/opt/store \
foxiswho/rocketmq:broker-4.3.2#启动容器
docker start rmqserver rmqbroker#停止删除容器
docker stop rmqbroker rmqserver
docker rm rmqbroker rmqserver
broker配置文件
#broker名
brokerName=broker_haoke_im
#broker IP
brokerIP1=8.140.130.91
#当前broker托管的NameServer地址
namesrvAddr=8.140.130.91:9876
#开启自定义属性支持
enablePropertyFilter=true
部署RocketMQ的管理工具
UI管理工具,rocketmq-console,项目地址https://github.com/apache/rocketmq-externals/tree/master/rocketmq-console
#拉取镜像
docker pull apacherocketmq/rocketmq-console:2.0.0#创建并启动容器
docker run -e "JAVA_OPTS=-Drocketmq.config.namesrvAddr=8.140.130.91:9876 -Drocketmq.config.isVIPChannel=false" -p 8082:8080 -t apacherocketmq/rocketmq-console:2.0.0
访问:http://8.140.130.91:8082/
Java API基本使用
创建topic
package com.rocketmq;import org.apache.rocketmq.client.producer.DefaultMQProducer;public class TopicDemo {public static void main(String[] args) throws Exception{//设置NameServer地址DefaultMQProducer producer = new DefaultMQProducer("test-group");//设置producer 的NameServerAddressproducer.setNamesrvAddr("8.140.130.91:9876");//启动NameServerproducer.start();/** 创建topic* @param key broker name* @param newTopic topic name* @param queueNum topic's queue number* */producer.createTopic("broker_haoke_im","test_topic",8);System.out.println("topic创建成功");producer.shutdown();}
}
发送消息
消息的属性
字段名 | 默认 值 | 说明 |
---|---|---|
Topic | null | 必填,线下环境不需要申请,线上环境需要申请后才能使用 |
Body | null | 必填,二进制形式,序列化由应用决定,Producer 与 Consumer 要协商好 序列化形式。 |
Tags | null | 选填,类似于 Gmail 为每封邮件设置的标签,方便服务器过滤使用。目前只 支持每个消息设置一个 tag,所以也可以类比为 Notify 的 MessageType 概 念 |
Keys | null | 选填,代表这条消息的业务关键词,服务器会根据 keys 创建哈希索引,设置 后,可以在 Console 系统根据 Topic、Keys 来查询消息,由于是哈希索引, 请尽可能保证 key 唯一,例如订单号,商品 Id 等。 |
Flag | 0 | 选填,完全由应用来设置,RocketMQ 不做干预 |
DelayTimeLevel | 0 | 选填,消息延时级别,0 表示不延时,大于 0 会延时特定的时间才会被消费 |
WaitStoreMsgOK | TRUE | 选填,表示消息是否在服务器落盘后才返回应答。 |
同步
package com.rocketmq.message;import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;public class SyncMessage {public static void main(String[] args) throws Exception{DefaultMQProducer producer = new DefaultMQProducer("test-group");producer.setNamesrvAddr("8.140.130.91:9876");producer.start();String msgStr = "测试消息1";/** String topic, String tags, byte[] body* */Message message = new Message("test_topic","test",msgStr.getBytes("UTF-8"));SendResult result = producer.send(message);System.out.println(result);System.out.println("消息状态:" + result.getSendStatus());System.out.println("消息id:" + result.getMsgId());System.out.println("消息queue:" + result.getMessageQueue());System.out.println("消息offset:" + result.getQueueOffset());producer.shutdown();}
}
异步
与同步区别在于,回调函数的执行是滞后的,主程序是顺序执行的
package com.rocketmq.message;import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;public class AsyncMessage {public static void main(String[] args) throws Exception{DefaultMQProducer producer = new DefaultMQProducer("test-group");producer.setNamesrvAddr("8.140.130.91:9876");producer.start();String msgStr = "异步消息发送测试";/** String topic, String tags, byte[] body* */Message message = new Message("test_topic","test",msgStr.getBytes("UTF-8"));producer.send(message, new SendCallback() {@Overridepublic void onSuccess(SendResult result) {System.out.println(result);System.out.println("消息状态:" + result.getSendStatus());System.out.println("消息id:" + result.getMsgId());System.out.println("消息queue:" + result.getMessageQueue());System.out.println("消息offset:" + result.getQueueOffset());}@Overridepublic void onException(Throwable e) {System.out.println("消息发送失败");}});// producer.shutdown()要注释掉,否则发送失败。原因是,异步发送,还未来得及发送就被关闭了//producer.shutdown();}
}
消费信息
package com.rocketmq.consumer;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;import java.io.UnsupportedEncodingException;
import java.util.List;public class ConsumerDemo {public static void main(String[] args) throws Exception{/** push类型的消费者,被动接收从broker推送的消息* */DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test-group");consumer.setNamesrvAddr("8.140.130.91:9876");//订阅topic,接收此topic下的所有消息consumer.subscribe("test_topic","*");consumer.registerMessageListener(new MessageListenerConcurrently() {//并发读取消息@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs) {try {System.out.println(new String(msg.getBody(),"UTF-8"));} catch (UnsupportedEncodingException e) {e.printStackTrace();}}System.out.println("收到消息->"+msgs);/** 返回给broker消费者的接收情况* CONSUME_SUCCESS 接收成功* RECONSUME_LATER 延时重发* */return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();}
}
测试接收历史消息:
测试接收实时消息:
消息的订阅方式
可以通过tag区分不同类型
#生产者
Message message = new Message("test_topic","add",msgStr.getBytes("UTF-8"));#消费者
//完整匹配
consumer.subscribe("test_topic","add");
//或匹配
consumer.subscribe("test_topic","add || delete");
消息过滤器
RocketMQ支持根据用户自定义属性进行过滤 ,类似与SQL
MessageSelector.bySql(“age>=20 AND sex=‘女’”));
消息发送方:
package com.rocketmq.filter;import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;/*** @author Auspice Tian* @time 2021-04-04 15:10* @current example-roketmq-com.rocketmq.filter*/
public class SyncProducer {public static void main(String[] args) throws Exception{DefaultMQProducer producer = new DefaultMQProducer("test-group");producer.setNamesrvAddr("8.140.130.91:9876");producer.start();String msgStr = "发送测试";Message msg = new Message("test_topic","test",msgStr.getBytes("UTF-8"));msg.putUserProperty("age","18");msg.putUserProperty("sex","女");SendResult result = producer.send(msg);System.out.println("消息状态"+result.getSendStatus());System.out.println("消息id"+ result.getMsgId());System.out.println("消息queue"+result.getMessageQueue());System.out.println("消息offset"+result.getQueueOffset());producer.shutdown();}
}
消息接收方:
package com.rocketmq.filter;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;import java.io.UnsupportedEncodingException;
import java.util.List;public class ConsumerFilter {public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test-group");consumer.setNamesrvAddr("8.140.130.91:9876");consumer.subscribe("test_topic", MessageSelector.bySql("age>=20 AND sex='女'"));consumer.registerMessageListener(new MessageListenerConcurrently() {//并发读取消息@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs) {try {System.out.println(new String(msg.getBody(),"UTF-8"));} catch (UnsupportedEncodingException e) {e.printStackTrace();}}System.out.println("收到消息->"+msgs);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();}
}
测试:
消息发送成功,但是由于不满足条件,被过滤器过滤,消费者未接收到
修改生产者自定义属性
Message msg = new Message("test_topic","test",msgStr.getBytes("UTF-8"));
msg.putUserProperty("age","21");
msg.putUserProperty("sex","女");
可以接收到消息
消息的顺序发送与接收
原理
消息的顺序收发,需要消费者与生产者二者配合
- 生产者发送的顺序消息都要放在同一消息队列中,才能保证被顺序取出
- 消费者接收的顺序消息,需要从同一队列中获取
生产者
package com.rocketmq.order;import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;public class OrderProducer {public static void main(String[] args) throws Exception{DefaultMQProducer producer = new DefaultMQProducer("test-group");producer.setNamesrvAddr("8.140.130.91:9876");producer.start();for (int i = 0; i < 100; i++) {int orderId = i % 10;//生产10个订单的消息,每个订单10条消息String msgStr = "order-->"+i + " orderId-->" + orderId;Message message = new Message("test_topic","ORDER_MSG",msgStr.getBytes("UTF-8"));/** public SendResult send(Message msg, MessageQueueSelector selector, Object arg)* MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);* */SendResult sendResult = producer.send(message,(mqs,msg,arg)->{//匿名函数的作用为选择消息队列的idInteger id = (Integer) arg;int index = id % mqs.size();return mqs.get(index);},//arg与orderId对应,orderId);System.out.println(sendResult);}producer.shutdown();}
}
消费者
public class OrderConsumer {public static void main(String[] args) throws Exception{DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test-order-group");consumer.setNamesrvAddr("8.140.130.91:9876");consumer.subscribe("test_order_topic","*");consumer.registerMessageListener(new MessageListenerOrderly() {//顺序读取消息@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {for (MessageExt msg : msgs) {try {System.out.println(Thread.currentThread().getName() + " "+ msg.getQueueId() + " "+ new String(msg.getBody(),"UTF-8"));} catch (UnsupportedEncodingException e) {e.printStackTrace();}}return ConsumeOrderlyStatus.SUCCESS;}});consumer.start();}
}
可见,订单id为3的消息,会存入同一消息队列,故在同一消息队列的消息可被同一消费线程监听
消息系统的事务
分布式事务分类:
- 基于单个JVM,数据库分库分表
- 基于多个JVM,服务拆分
- 基于多JVM,服务拆分且数据库分库分表
原理
Half(Prepare) Message
消息系统暂时不能投递的消息:发送方将消息发送到了MQ服务端。MQ服务端未收到生产者对消息的二次确认,此时该消息被标记为 暂不能投递状态 处于该状态的消息称为 半消息
Message Status Check
由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确认丢失,MQ服务端发现某条消息长期处于 半消息,需要主动向消息生产者询问该消息的状态
-
发送方向MQ服务端发送消息
-
MQ Server将消息持久化成功后,向发送方ACK确认消息已经发送成功,此时消息为 半消息
-
发送方开始执行本地事务逻辑
-
发送方根据本地事务执行结果向MQ Server提交二次确认(Commit或Rollback),MQ Server 收到 Commit 则将半消息标记为 可投递,订阅方最终收到该消息;MQ Server收到 Rollback ,则删除该半消息,订阅方不会收到该消息
-
在断网或应用重启情况下,上述4提交的二次确认最终未到达MQ Server,经过固定时间后,MQ Server将对该消息发起消息回查
-
发送方收到消息回查,需要检查对应消息的本地事务执行的最终结果
-
发送方根据检查得到的本地事务的最终状态再次提交二次确认,MQ Server仍按4对半消息进行确认
生产者
package com.rocketmq.trancation;public class TrancationProducer {public static void main(String[] args) throws Exception{TransactionMQProducer producer = new TransactionMQProducer("test_transaction_producer");producer.setNamesrvAddr("8.140.130.91:9876");//设置事务监听器producer.setTransactionListener(new TransactionImpl());producer.start();//发送消息Message message = new Message("pay_topic","用户A给用户B转钱".getBytes("UTF-8"));producer.sendMessageInTransaction(message,null);Thread.sleep(99999);producer.shutdown();}
}
本地事务处理
package com.rocketmq.trancation;public class TransactionImpl implements TransactionListener {private static Map<String, LocalTransactionState> STATE_MAP = new HashMap<>();/*** 本地执行业务具体的逻辑* @param msg* @param arg* @return*/@Overridepublic LocalTransactionState executeLocalTransaction(Message msg, Object arg) {try {Thread.sleep(500);System.out.println("用户A账户减500");// System.out.println(1/0);System.out.println("用户B账户加500元.");Thread.sleep(800);//二次提交确认STATE_MAP.put(msg.getTransactionId(),LocalTransactionState.COMMIT_MESSAGE);return LocalTransactionState.COMMIT_MESSAGE;} catch (InterruptedException e) {e.printStackTrace();}//回滚STATE_MAP.put(msg.getTransactionId(), LocalTransactionState.ROLLBACK_MESSAGE);return LocalTransactionState.ROLLBACK_MESSAGE;}/*** 消息回查* @param msg* @return*/@Overridepublic LocalTransactionState checkLocalTransaction(MessageExt msg) {return STATE_MAP.get(msg.getTransactionId());}
}
消费者
package com.rocketmq.trancation;public class TransactionConsumer {public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_transaction_consumer");consumer.setNamesrvAddr("8.140.130.91:9876");//订阅topic,接收消息consumer.subscribe("pay_topic","*");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs) {try {System.out.println(new String(msg.getBody(),"UTF-8"));} catch (UnsupportedEncodingException e) {e.printStackTrace();}}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();}
}
测试
-
返回 commit 状态时,消费者能够接收消息
-
返回 rollback 状态时,消费者接收不到消息
-
消息回查测试
public class TransactionImpl implements TransactionListener {private static Map<String, LocalTransactionState> STATE_MAP = new HashMap<>();/*** 本地执行业务具体的逻辑* @param msg* @param arg* @return*/@Overridepublic LocalTransactionState executeLocalTransaction(Message msg, Object arg) {try {System.out.println("用户A账户减500");Thread.sleep(500);// System.out.println(1/0);System.out.println("用户B账户加500元.");Thread.sleep(800);//二次提交确认STATE_MAP.put(msg.getTransactionId(),LocalTransactionState.COMMIT_MESSAGE);return LocalTransactionState.UNKNOW; // return LocalTransactionState.COMMIT_MESSAGE;} catch (Exception e) {e.printStackTrace();}//回滚STATE_MAP.put(msg.getTransactionId(), LocalTransactionState.ROLLBACK_MESSAGE);return LocalTransactionState.ROLLBACK_MESSAGE;}/*** 消息回查* @param msg* @return*/@Overridepublic LocalTransactionState checkLocalTransaction(MessageExt msg) {System.out.println("状态回查-->"+ msg.getTransactionId() + " "+ STATE_MAP.get(msg.getTransactionId()));return STATE_MAP.get(msg.getTransactionId());} }
Consumer
Push和Pull模式
- push模式:客户端与服务端建立连接后,当服务端有消息,将消息推送到客户端
- pull模式:客户端不断的轮询请求服务端,来获取新的而消息
push模式需要消息系统与消费端之间建立长连接,对消息系统是很大的负担,所以在具体实现时,都采用消费端主动拉取的方式,即consumer轮询从broker拉取消息
在RocketMQ中,push与pull的区别
Push:
DefaultPushConsumer
将轮询过程都封装了,并注册MessageListener监听器,取到消息后,唤醒MessageListener监听器的consumeMessage()来消费,对用户而言,感觉消息是被推送来的。Pull:取消息过程需要自己写:首先从目标topic中拿到MessageQueue集合并遍历,然后针对每个MessageQueue批量取消息。一次Pull,都要记录该队列的offset,知道去完MessageQueue,再换另一个
长轮询保证Pull的实时性
长轮询(长连接+轮询),客户端像传统轮询一样从服务端请求数据,服务端会阻塞请求不会立刻返回,直到有数据或超时才返回给客户端,然后关闭连接,客户端处理完响应信息后再向服务器发送新的请求
消息模式
DefaultMQPushConsumer实现了自动保存offset值及多个consumer的负载均衡
//设置组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("HAOKE_IM");
通过 groupname
将多个consumer组合在一起,会存在消息的分配问题(消息是发送到组还是每个消费者)
-
集群模式(默认)
同一个ConsumerGroup里的每个Consumer只消费所订阅消息的一部分内容,同一个ConsumerGroup里所有消费的内容合起来才是所订阅Topic内容的整体,从而达到负载均衡的目的
-
广播模式
同一个ConsumerGroup里的每个Consumer都能消费到所订阅Topic的全部消息,一个消息会被分发多次,被多个Consumer消费
// 集群模式
consumer.setMessageModel(MessageModel.CLUSTERING);
// 广播模式
consumer.setMessageModel(MessageModel.BROADCASTING);
重复消息的解决方案
重复消息的产生情况:
-
生产者不断发送重复消息到消息系统
-
网络不可达 :只要通过网络交换数据,就无法避免这个问题
由于接收到重复消息不可避免,问题变为 消费端收到重复消息,怎么处理
-
消费端处理消息的业务逻辑保持幂等性
幂等性:无论执行多少次,结果都一样
eg:while s!=1;在执行sql语句
-
保证每条消息都有唯一编号且保证消息处理成功与去重的日志同时出现
利用一张日志表来记录已经处理成功的消息的ID,如果新到的消息ID已经在日志表中,那么就不再处理这条消息
如果由消息系统来实现的话,肯定会对消息系统的吞吐量和高可用有影响,所以最好还是由业务端自己处理消息重复的问题,这也是 RocketMQ不解决消息重复的问题 的原因
RocketMQ存储
RocketMQ中的消息数据存储,采用了零拷贝技术(mmap + write方式),文件系统采用 Linux Ext4文件系统进行存储。
消息数据的存储
在RocketMQ中,消息数据是保存在磁盘文件中的,使用RocketMQ尽可能保证顺序写入,比随机写入效率高很多
-
ConsumeQueue:索引文件,存储数据指向物理文件的位置
-
CommitLog是真正存储数据的文件
-
消息主体及元数据都存储在CommitLog中
-
Consume Queue 是一个逻辑队列,存储了这个Queue在CommitLog中的其实offset、log大小和MessageTag的hashcode
-
每次读取消息队列先读取ConsumerQueue,然后再通过consumerQueue中拿到消息主体
同步刷盘和异步刷盘
RocketMQ为提高性能,会尽可能保证磁盘的顺序读写。消息通过Producer写入RocketMQ的时候,有两种写磁盘方式,分别是同步刷盘与异步刷盘
- 同步刷盘——安全性
- 在返回写成功状态时,消息已经写入磁盘
- 执行流程:消息写入内存的PAGECACHE后,立刻通知刷盘线程刷盘,等待刷盘完成,刷盘线程执行完成后唤醒等待的线程,返回消息写成功的状态
- 异步刷盘——效率
- 在返回写成功状态时,消息可能只是被写入内存的PAGECACHE,写操作的返回快,吞吐量大
- 当内存里的消息积累到一定程度,统一触发写磁盘动作,快速写入
修改刷盘方式
broker.conf
flushDiskType=ASYNC_FLUSH——异步
flushDiskType=SYNC_FLUSH——同步
重试策略
重试情况分析
在消息的发送和消费过程中,都有可能出现错误,如网络异常等,出现了错误就需要进行错误重试,这种消息的重试分为 producer端的重试 和 consumer端重试
producer端重试
- 指定重试次数
- 指定超时时间
//消息发送失败时,重试3次
producer.setRetryTimesWhenSendFailed(3);// 发送消息,并且指定超时时间
SendResult sendResult = producer.send(msg, 1000);
- 只有同步生产者才会进行错误重试。
- 只有特定异常才会重试;
- 设置的超时时间小于实际执行时间,则不会进行重试
#DefaultMQProducerImpl
//设置发送总次数
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;for (; times < timesTotal; times++) {try{if (timeout < costTime) {callTimeout = true;break;}}catch (RemotingException e) {...continue;}catch (MQClientException e) {...continue;}catch (MQBrokerException e){switch (e.getResponseCode()) {case ResponseCode.TOPIC_NOT_EXIST:case ResponseCode.SERVICE_NOT_AVAILABLE:case ResponseCode.SYSTEM_ERROR:case ResponseCode.NO_PERMISSION:case ResponseCode.NO_BUYER_ID:case ResponseCode.NOT_IN_CURRENT_UNIT:continue;}}
}
consumer端重试
- 消息处理的异常失败
- 消息超时接收的超时失败
异常重试
消息正常到了消费者端,处理失败,发生异常。eg:反序列化失败,消息数据本身无法处理
消息状态
package org.apache.rocketmq.client.consumer.listener;public enum ConsumeConcurrentlyStatus {/*** Success consumption*/CONSUME_SUCCESS,/*** Failure consumption,later try to consume*/RECONSUME_LATER;
}
broker的启动日志
INFO main - messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
如果消息消费失败即broker收到 RECONSUME_LATER ,则broker会对消息进行重试发送,直至2h
演示:
public class ConsumerDemo {public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_consumer_group");consumer.setNamesrvAddr("8.140.130.91:9876");// 订阅topic,接收此Topic下的所有消息consumer.subscribe("test_error_topic", "*");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs) {try {System.out.println(new String(msg.getBody(), "UTF-8"));} catch (UnsupportedEncodingException e) {e.printStackTrace();}}System.out.println("收到消息->" + msgs);if(msgs.get(0).getReconsumeTimes() >= 3){// 重试3次后,不再进行重试return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}return ConsumeConcurrentlyStatus.RECONSUME_LATER;}});consumer.start();}
}
重试消息和原始发送消息不是同一条
timeout
由于消息没有从MQ发送到消费者上,那么在MQ Server内部会不断的尝试发送这条消息,直至发送成功位置
也就是,服务端没有接收到消费端发来的消息的反馈,定义为超时
RocketMQ的集群
集群模式
单个Master
- 风险较大,一旦Broker重启或者宕机,会导致整个服务不可用,只做开发环境
多Master
- 一个集群无Slave,全是Master,例如2个Master或者3个Master
- 单台机器宕机,这台机器上未被消费的消息在机器恢复之前不可订阅,消息的实时性受到影响
多Master多Slave,异步复制
- 每个Master配置一个Slave,有多个Master-Slave对,HA(双机集群系统)采用异步复制方式,主备有短暂消息延迟,毫秒级
- 优点:即使磁盘损坏,丢失的消息非常少,实时性不会收到影响,消费者仍可从Slave消费,此过程对应用透明,不需人工干预,性能同多Master模式一样
- 缺点:Master宕机或磁盘损坏,会丢失少量消息
多Master多Slave,同步双写
- 每个Master配置一个Slave,有多个Master-Slave对,HA(双机集群系统)采用同步双写方式,主备都写成功,向应用返回成功
- 优点:数据与服务无单点,Master宕机情况下,消息无延迟,服务可用性和数据可用性非常高
- 缺点:性能比异步复制模式低
搭建2m2s集群
创建2个NameServer(master)
#nameserver1
docker create -p 9876:9876 --name rmqserver01 \
-e "JAVA_OPT_EXT=-server -Xms128m -Xmx128m -Xmn128m" \
-e "JAVA_OPTS=-Duser.home=/opt" \
-v /data/rmq-data/rmqserver01/logs:/opt/logs \
-v /data/rmq-data/rmqserver01/store:/opt/store \
foxiswho/rocketmq:server-4.3.2#nameserver2
docker create -p 9877:9876 --name rmqserver02 \
-e "JAVA_OPT_EXT=-server -Xms128m -Xmx128m -Xmn128m" \
-e "JAVA_OPTS=-Duser.home=/opt" \
-v /data/rmq-data/rmqserver02/logs:/opt/logs \
-v /data/rmq-data/rmqserver02/store:/opt/store \
foxiswho/rocketmq:server-4.3.2
搭建broker(2master)
#broker01配置文件
namesrvAddr=8.140.130.91:9876;8.140.130.91:9877
brokerClusterName=HaokeCluster
brokerName=broker01
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=SYNC_MASTER
flushDiskType=ASYNC_FLUSH
brokerIP1=8.140.130.91
brokerIp2=8.140.130.91
listenPort=11911#master broker01
docker create --net host --name rmqbroker01 \
-e "JAVA_OPTS=-Duser.home=/opt" \
-e "JAVA_OPT_EXT=-server -Xms128m -Xmx128m -Xmn128m" \
-v /data/rmq-data/rmqbroker01/conf/broker.conf:/etc/rocketmq/broker.conf \
-v /data/rmq-data/rmqbroker01/logs:/opt/logs \
-v /data/rmq-data/rmqbroker01/store:/opt/store \
foxiswho/rocketmq:broker-4.3.2
-
brokerId:0表示主,>0表示Slave
-
fileReservedTime:消息保存时间 单位——h
-
deleteWhen:什么是时候对过期消息清理 24小时制
-
brokerRole:[同步双写|异步双写]_[主] | [从]
[SYNC|ASYNC_MASTER] | [SLAVE]
-
flushDiskType:刷盘方式 [同步|异步]_FLUSH
[SYNC|ASYNC_FLUSH]
-
brokerIP1:访问broker的ip地址
-
brokerIP2:主从同步的ip
-
listenPort:与客户端交互的端口(+1,-2)
#broker02配置文件
namesrvAddr=8.140.130.91:9876;8.140.130.91:9877
brokerClusterName=HaokeCluster
brokerName=broker02
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=SYNC_MASTER
flushDiskType=ASYNC_FLUSH
brokerIP1=8.140.130.91
brokerIp2=8.140.130.91
listenPort=11811#master broker02
docker create --net host --name rmqbroker02 \
-e "JAVA_OPTS=-Duser.home=/opt" \
-e "JAVA_OPT_EXT=-server -Xms128m -Xmx128m -Xmn128m" \
-v /data/rmq-data/rmqbroker02/conf/broker.conf:/etc/rocketmq/broker.conf \
-v /data/rmq-data/rmqbroker02/logs:/opt/logs \
-v /data/rmq-data/rmqbroker02/store:/opt/store \
foxiswho/rocketmq:broker-4.3.2
搭建从broker(slave)
#slave broker01配置文件
namesrvAddr=8.140.130.91:9876;8.140.130.91:9877
brokerClusterName=HaokeCluster
brokerName=broker01
brokerId=1
deleteWhen=04
fileReservedTime=48
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH
brokerIP1=8.140.130.91
brokerIp2=8.140.130.91
listenPort=11711#slave broker01
docker create --net host --name rmqbroker03 \
-e "JAVA_OPTS=-Duser.home=/opt" \
-e "JAVA_OPT_EXT=-server -Xms128m -Xmx128m -Xmn128m" \
-v /data/rmq-data/rmqbroker03/conf/broker.conf:/etc/rocketmq/broker.conf \
-v /data/rmq-data/rmqbroker03/logs:/opt/logs \
-v /data/rmq-data/rmqbroker03/store:/opt/store \
foxiswho/rocketmq:broker-4.3.2
#slave broker02配置文件
namesrvAddr=8.140.130.91:9876;8.140.130.91:9877
brokerClusterName=HaokeCluster
brokerName=broker02
brokerId=1
deleteWhen=04
fileReservedTime=48
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH
brokerIP1=8.140.130.91
brokerIp2=8.140.130.91
listenPort=11611#slave broker02
docker create --net host --name rmqbroker04 \
-e "JAVA_OPTS=-Duser.home=/opt" \
-e "JAVA_OPT_EXT=-server -Xms128m -Xmx128m -Xmn128m" \
-v /data/rmq-data/rmqbroker04/conf/broker.conf:/etc/rocketmq/broker.conf \
-v /data/rmq-data/rmqbroker04/logs:/opt/logs \
-v /data/rmq-data/rmqbroker04/store:/opt/store \
foxiswho/rocketmq:broker-4.3.2
#启动容器
docker start rmqserver01 rmqserver02
docker start rmqbroker01 rmqbroker02 rmqbroker03 rmqbroker04
测试
生产者
public class SyncMessage {public static void main(String[] args) throws Exception{DefaultMQProducer producer = new DefaultMQProducer("test_cluster_group");producer.setNamesrvAddr("8.140.130.91:9876;8.140.130.91:9877");producer.start();String msgStr = "Cluster测试消息";/** String topic, String tags, byte[] body* */Message message = new Message("test_cluster_topic","CLUSTER",msgStr.getBytes("UTF-8"));SendResult result = producer.send(message);System.out.println(result);System.out.println("消息状态:" + result.getSendStatus());System.out.println("消息id:" + result.getMsgId());System.out.println("消息queue:" + result.getMessageQueue());System.out.println("消息offset:" + result.getQueueOffset());producer.shutdown();}
}
消费者
public class ConsumerDemo {public static void main(String[] args) throws Exception{/** push类型的消费者,被动接收从broker推送的消息* */DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_cluster_group");consumer.setNamesrvAddr("8.140.130.91:9876;8.140.130.91:9877");//订阅topopic,接收此topic下的所有消息consumer.subscribe("test_cluster_topic","*");consumer.registerMessageListener(new MessageListenerConcurrently() {//并发读取消息@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs) {try {System.out.println(new String(msg.getBody(),"UTF-8"));} catch (UnsupportedEncodingException e) {e.printStackTrace();}}System.out.println("收到消息->"+msgs);/** 返回给broker消费者的接收情况* CONSUME_SUCCESS 接收成功* RECONSUME_LATER 延时重发* */return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();}
}
SpringBoot整合RocketMQ
下载依赖
由于rocketMQ没有发布到Mven中央仓库,需要自行下载源码,并载入到本地Maven仓库
#源码地址
https://hub.fastgit.org/apache/rocketmq-spring#进入源码目录,执行
mvn clean install
导入依赖
<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.4.3</version>
</parent><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.0.0</version></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.3.2</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><scope>test</scope></dependency>
</dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.2</version><configuration><source>1.8</source><target>1.8</target><encoding>UTF-8</encoding></configuration></plugin></plugins>
</build>
application.properties
#Spring boot application
spring.application.name = test-rocketmq
spring.rocketmq.nameServer=8.140.130.91:9876
spring.rocketmq.producer.group=test_spring_producer_group
基本使用
生产者发送消息
package com.rocketmq.spring;@Component
public class SpringProducer {//注入rocketmq模板@Autowiredprivate RocketMQTemplate rocketMQTemplate;/*** 发送消息** @param topic* @param msg*/public void sendMsg(String topic,String msg){this.rocketMQTemplate.convertAndSend(topic,msg);}
}
启动类
package com.rocketmq;@SpringBootApplication
public class MyApplication {public static void main(String[] args) {SpringApplication.run(MyApplication.class,args);}
}
测试生产消息
package com.rocketmq.spring;import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;@RunWith(SpringRunner.class)
@SpringBootTest
public class TestSpringRocketMQ {@AutowiredSpringProducer producer;@Testpublic void testSendMsg(){String msg = "第二个Spring RocketMq 消息";this.producer.sendMsg("test_spring_topic",msg);System.out.println("发送成功!");}}
消费者消费消息
package com.rocketmq.spring;import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;@Component
@RocketMQMessageListener(topic = "test_spring_topic",consumerGroup = "test_spring_consumer_group",selectorExpression = "*",consumeMode = ConsumeMode.CONCURRENTLY
)
public class SpringConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String msg) {System.out.println("收到消息->"+msg);}
}
事务消息
生产者
package com.rocketmq.spring.transaction;@Component
public class TransactionProducer {@Autowiredprivate RocketMQTemplate rocketMQTemplate;/*** 发送消息** @param topic* @param msg*/public void sendMsg(String topic,String msg){Message message = (Message) MessageBuilder.withPayload(msg).build();//此处的txProducerGroup与事务监听器的@RocketMQTransactionListener(txProducerGroup = "")一致this.rocketMQTemplate.sendMessageInTransaction("test_tx_producer_group",topic,message,null);System.out.println("消息发送成功");}
}
生产者监听器
package com.rocketmq.spring.transaction;@RocketMQTransactionListener(txProducerGroup = "test_tx_producer_group")
public class TransactionListenerImpl implements RocketMQLocalTransactionListener {private static Map<String,RocketMQLocalTransactionState> STATE_MAP = new HashMap<>();/*** 执行本地事务** @param message* @param o* @return*/@Overridepublic RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {String transactionId = (String) message.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);try {System.out.println("执行操作1");Thread.sleep(500L);System.out.println("执行操作2");Thread.sleep(500L);STATE_MAP.put(transactionId,RocketMQLocalTransactionState.COMMIT);return RocketMQLocalTransactionState.COMMIT;}catch (Exception e){e.printStackTrace();}STATE_MAP.put(transactionId,RocketMQLocalTransactionState.ROLLBACK);return RocketMQLocalTransactionState.ROLLBACK;}/*** 消息回查** @param message* @return*/@Overridepublic RocketMQLocalTransactionState checkLocalTransaction(Message message) {String transactionId = (String) message.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);System.out.println("回查消息->transactionId = "+transactionId+",state = "+STATE_MAP.get(transactionId));return STATE_MAP.get(transactionId);}
}
消息生产测试
@Test
public void testSendTransactionMsg(){String msg = "事务消息测试!";this.transactionProducer.sendMsg("test_spring_transaction_topic",msg);System.out.println("发送成功");
}
消费者测试
package com.rocketmq.spring.transaction;@Component
@RocketMQMessageListener(topic = "test_spring_transaction_topic",consumeMode = ConsumeMode.CONCURRENTLY,selectorExpression = "*",consumerGroup = "test_tx_consumer_group"
)
public class TransactionConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String s) {System.out.println("收到消息->"+s);}
}
消息回查测试
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {String transactionId = (String) message.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);try {System.out.println("执行操作1");Thread.sleep(500L);System.out.println("执行操作2");Thread.sleep(500L);STATE_MAP.put(transactionId,RocketMQLocalTransactionState.COMMIT);return RocketMQLocalTransactionState.UNKNOWN;}catch (Exception e){e.printStackTrace();}STATE_MAP.put(transactionId,RocketMQLocalTransactionState.ROLLBACK);return RocketMQLocalTransactionState.ROLLBACK;
}
相关文章:
FICO内部订单详细解析
内部订单是用来对企业内部某项工作或者任务编制计划、归集成本、结算的载体。比如市场推广活动、内部团队活动、研发项目、投资项目、在建工程项目等。内部订单需要区别于销售订单、采购订单和生产订单。销售订单和采购订单是企业与外部单位以合同或者契约为纽带,在执行交易后…...
memset
memset函数及其用法 include <string.h> void *memset(void *s, int c, unsigned long n);函数的功能是: 将指针变量 s 所指向的前 n 字节的内存单元用一个“整数” c 替换,注意 c 是 int 型。s 是 void* 型的指针变量,所以它可以为任何类型的数据进行初始化。...
电商技术揭秘二十八:安全与合规性保障
电商技术揭秘相关系列文章(上) 相关系列文章(中) 电商技术揭秘二十:能化供应链管理 电商技术揭秘二十一:智能仓储与物流优化(上) 电商技术揭秘二十二:智能仓储与物流优化(下) 电商技术揭秘二十三:智能…...
循环神经网络(RNN):概念、挑战与应用
循环神经网络(RNN):概念、挑战与应用 1 引言 1.1 简要回顾 RNN 在深度学习中的位置与重要性 在深度学习的壮丽图景中,循环神经网络(Recurrent Neural Networks,RNN)占据着不可或缺的地位。自从…...
NGINX Ingress Controller 设置未配置过的域名增加默认路由
背景 k8s 集群对应的公网 slb ip 经常被人绑定域名,监控侧经常会收集到 502 相关状态码的异常告警,着手处理这种bad case策略 1. 所有没有在ingress 配置过的域名要进行处理,即不是公司的、非法绑定到slb 上的域名要加上一条策略 2. NGINX Ingress Controller 设置未配置过…...
IIC总线读取温度湿度传感器数据实验
iic.c #include "iic.h"extern void printf(const char* fmt, ...); /** 函数名 : delay_us* 函数功能:延时函数* 函数参数:无* 函数返回值:无* */ void delay_us(void) //微秒级延时 {unsigned int i 2000;while(i-…...
【开发】中间件——RocketMQ
分布式消息系统 RocketMQ概念,用途,特性安装RocketMQ掌握RocketMQ的api使用对producer、consumer进行详解了解RocketMQ的存储特点 简介及相关概念JavaAPISpringBoot整合RocketMQ消息的顺序收发消息系统的事务、存储、重试策略消息系统的集群 RocketMQ R…...
Error: error:0308010C:digital envelope routines::unsupported
vscode运行react时报错 opensslErrorStack: [ ‘error:03000086:digital envelope routines::initialization error’ ], library: ‘digital envelope routines’, reason: ‘unsupported’, code: ‘ERR_OSSL_EVP_UNSUPPORTED’ 什么原因导致 “0308010c:digital envelope r…...
【第017问 Unity Physics.OverlapSphere如何检测附近玩家?】
一、背景 如何检测一个对象范围内的玩家,这个可以直接使用距离判定,物体射线检测等相关方式;这里采用Physics.OverlapSphere的方式来实践其过程,并对Physics.OverlapSphere的使用做一下记录; 二、Physics.OverlapSph…...
C++ MVC模式
概述 C是一种流行的编程语言,它可以用于构建各种类型的应用程序,包括Web应用程序、桌面应用程序和移动应用程序。在这里,我将为您介绍C中的MVC模式,以及如何在C中实现MVC模式。 MVC(Model-View-Controller࿰…...
插入排序(C++)
算法思想:插入排序的工作原理是通过构建有序序列,对于未排序的数据,在已排序的序列中从后向前扫描,找到相应的位置并插入。插入排序在实现上,通常采用in-place排序,因而在从后向前扫描的过程中,…...
服务端(三) node.js 主要的核心模块
// 核心模块,是node中自带的模块,可以在node中直接使用 // window 是浏览器的宿主对象,node中是没有的 // global 是node中的全局对象,作用类似于window // ES标准下,全局对象的标准名应该是 globalThis /* 核心模块…...
Libtorch的安装与介绍
1.背景众所周知,现在提到深度学习就离不开PyTorch。但其实PyTorch从更广泛的意义上来说,也只是Torch的Python接口而已。只是大家现在都习惯用Python写代码,所以PyTorch比较火。但是不要忘了Torch其实还有C的接口,名字叫libtorch。…...
二叉树(堆)
目录一、什么是堆?二、堆的实现2.1 结构体变量的声明2.2 堆的初始化2.3 堆的销毁2.4 插入数据2.5 删除数据2.6 堆内有效数据的数目2.7 取堆顶元素2.8 判断堆是否为空2.9 代码汇总三、经典“TopK”问题一、什么是堆? heap 是一个抽象的数据结构ÿ…...
从矩阵理论角度理解偏最小二乘回归,以及在脑科学中(脑影像与行为、基因表达的关系)的应用举例
偏最小二乘法 (PLS) 是一种多元数据驱动的统计技术,旨在提取表示最大大脑行为关联的潜在变量(或潜在成分 latent components [LC])。 从矩阵理论角度理解偏最小二乘回归,以及在脑科学中(脑影像与行为、基因表达的关系)的应用举例 矩阵理论角度理解偏最小二乘回归偏最小二…...
dp 就 dp ,数位dp是什么意思 ?
💧 dp 就 dp ,数位dp是什么意思 ?💧 🌷 仰望天空,妳我亦是行人.✨ 🦄 个人主页——微风撞见云的博客🎐 🐳 数据结构与算法专栏的文章图文并茂🦕生动…...
MySQL面试必看
1.MySQL中的索引用的是什么数据结构 Innodb使用B树数据结构 1.Hash表:等值查询效率比较高、但是不支持范围查询。 2.二叉树:时间复杂度log2n 缺点:有可能产生不平衡 类似于链表的结构 时间复杂度为o(n)。 3.平衡二叉树avl/红黑树:…...
2023年全国最新道路运输从业人员精选真题及答案34
百分百题库提供道路运输安全员考试试题、道路运输从业人员考试预测题、道路安全员考试真题、道路运输从业人员证考试题库等,提供在线做题刷题,在线模拟考试,助你考试轻松过关。 28.根据《放射性物品运输安全管理条例》规定,运输放…...
python -- 科研论文海洋气象科学绘图的配色汇总
海洋气象科学科研绘图中常用的配色[1、ColorBrewer 彩色地图,默认情况下包含在 matplotlib 中](https://colorbrewer2.org/#typesequential&schemeBuGn&n3)[2、proplot package 自带的色系](https://proplot.readthedocs.io/en/latest/colormaps.html#ug-pe…...
Prometheus监控实战系列二: 安装部署
Prometheus支持多种操作系统,例如Linux、Windows和Max OSX等。在产品官网上提供了独立的二进制文件进行下载,可下载对应的tar包并在相应系统的服务器上进行安装部署。当然,做为与容器有着紧密联系的监控系统,Promethesu也可以很方…...
欧莱雅校招负责人张泽宇:拥抱Z世代,探索新玩法
作为校招HR,你在雇主品牌创新实践的路上做过什么尝试? 2020年,欧莱雅正式推出了全新的雇主品牌价值主张 —— 敢为敢超越,就是欧莱雅(Freedom to go beyond, thats the beauty of L’ORAL),鼓励…...
安装python教程并解决Python安装完没有Scripts文件夹问题
安装python教程 并解决Python安装完没有Scripts文件夹问题 ** 一背景 **首先要了解这个出现的原因是下载安装的版本问题 系統是32 bit 的版本还是 64bit 的 web-based: 透过网络安装的,就是执行安装后才透过网络下载python executable: 可執行文件的ÿ…...
牛客论坛项目总结
目录 1.请简要介绍一下你的项目? 1.如何实现项目的注册问题 2.项目如何实现用户唯一性检验 3.登录状态保存在哪 4.用户登陆上之后怎么显示登录页面 5.拦截器(Interceptor) 6.ThreadLocal(线程安全) 7.md5原理知…...
【Python学习笔记】b站@同济子豪兄 用pytorch搭建全连接神经网络,对Fashion-MNIST数据集中的时尚物品进行分类
【Python学习笔记】原作b站同济子豪兄 用pytorch搭建全连接神经网络,对Fashion-MNIST数据集中的时尚物品进行分类 跟着b站同济子豪兄的视频自学写的代码,内容是用pytorch搭建全连接神经网络,对Fashion-MNIST数据集中的时尚物品进行分类 视频…...
2年功能测试月薪9.5K,100多天自学自动化,跳槽涨薪4k后我的路还很长...
前言 其实最开始我并不是互联网从业者,是经历了一场六个月的培训才入的行,这个经历仿佛就是一个遮羞布,不能让任何人知道,就算有面试的时候被问到你是不是被培训的,我还是不能承认这段历史。我是为了生存,…...
【IoT 毕业设计】Ruff硬件+阿里云IoT+微信小程序构建环境监控系统
0.技术架构 IoT 物联网毕业设计实战采用 Ruff 开发板,串口连接温湿度传感器DHT11和空气质量传感器SDS011,每5分钟采集一次数据,通过MQTT协议发送到阿里云 IoT 物联网平台,写入云端的设备影子中。微信小程序调用阿里云函数计算FC…...
【VUE3】计算属性及其缓存特性
计算属性 基础示例 模板中的表达式虽然方便,但也只能用来做简单的操作。如果在模板中写太多逻辑,会让模板变得臃肿,难以维护。比如说,我们有这样一个包含嵌套数组的对象: const author reactive({name: John Doe,b…...
【计算机网络】从输入网址到网页显示,期间发生了什么?
【计算机网络】从输入网址到网页显示,期间发生了什么? 接下来以下图较简单的网络拓扑模型作为例子,探究探究其间发生了什么? 文章目录【计算机网络】从输入网址到网页显示,期间发生了什么?一:孤…...
【vue2】近期bug收集与整理01
🥳博 主:初映CY的前说(前端领域) 🌞个人信条:想要变成得到,中间还有做到! 🤘本文核心:记录博主在vue2中遇到过的坑,本文是博主的学习使用总结 目录 1登陆token的问…...
JSON和AJAX
JSON JSON(JavaScript Object Notation)是一种轻量级的数据交换格式。易于人阅读和编写。同时也易于机器解析和生成。JSON采用完全独立于语言的文本格式,而且很多语言都提供了对json的支持(包括C,C,C#,Java,JavaScript…...
Python入门到精通【精品】第六章 - 函数
Python入门到精通【精品】第六章 - 函数 1. 如何理解函数2. 函数的定义3. 函数的使用3.1. 函数的调用3.2. 实参与形参3.3. 函数的返回3.4. 完整的函数设计3.5. 位置参数和关键参数1. 如何理解函数 当你第一次接触到“函数”这个概念的时候,你肯定会不由自主的联想到数学里面也…...
春招大盘点:找工作除了招聘网站还有哪些渠道?
又是一年毕业季,估计同学们都正在写论文、找工作两头忙,很多同学和小C“诉苦”说现在找实习的渠道太少了,招聘网站都刷完了,也没看到很合适的岗位。那找工作除了招聘网站还有什么渠道呢?其实是有的,今天就为…...
雷电4模拟器安装xposed框架(2022年)
别问我都2202年了为什么还在用雷电4安卓7。我特么哪知道Xposed的相关资料这么难找啊,只能搜到一些老旧的资料,尝试在老旧的平台上实现了。 最初的Xposed框架现在已经停止更新了,只支持到安卓8。如果要在更高版本的安卓系统上使用Xposed得看看…...
Gartner发布CNAPP市场指南 腾讯云为国内唯一入选云厂商
近日,国际研究机构Gartner发布《Market Guide for Cloud-Native Application Protection Platforms》(《云原生应用保护平台(CNAPP)市场指南》)(以下简称《市场指南》),腾讯云凭借集…...
数字藏品应用场景分析
数字藏品应用场景广泛,个人资料图片(PFP)元宇宙、艺术收藏、游戏、体育、文物、音乐等等都可以上链,以数字藏品的形式发行。国际市场中,个人资料图片占大多数,国内多以艺术收藏、文物藏品等为主。 数字藏品…...
spring boot项目:实现与数据库的连接
步骤【写在前面】定义数据库连接信息:引入数据库驱动:创建数据源:创建JdbcTemplate:编写DAO层:使用Service注解标注Service层:使用RestController注解标注Controller层:示例代码:app…...
解析vue中的process.env
一、介绍 1、process process是 nodejs 下的一个全局变量,它存储着 nodejs 中进程有关的信息。 2、process.env env 是 environment 的简称,process.env属性返回一个包含用户环境的对象。 3、dotenv Dotenv 是一个零依赖的模块,它能将环境变…...
ESP32 开启 Wi-Fi 热点与手机端 Iperf 测试 APP 来测试 ESP32 Wi-Fi AP 速率的流程
# 测试需求: ESP32 开启 WiFi AP Server 模式手机连接 ESP32 WiFi AP 热点通过手机端 Iperf 测试 APP 测试 ESP32 WiFi 热点的 Iperf 速率 测试用例: 可以基于 “esp-idf/examples/wifi/iperf” 例程进行测试。ESP32 设备下载 Iperf 例程后࿰…...
msfconsole之制作windows木马并成功获取shell
msfconsole之制作windows木马并成功获取shell 一、工具简介 msfconsole 简称 msf 是一款常用的安全测试工具,包含了常见的漏洞利用模块和生成各种木马,其提供了一个一体化的集中控制台,通过msfconsole,你可以访问和使用所…...
【小杨带你玩转C语言】(入门篇)初识C语言(下)
本章目录 每篇前言1.导语 2.目标 3.知识点 一,常见关键字 1,认识关键字 2,关键字分类 2.1,数据类型关键字 2.1.1,基本数据类型关键字 2.…...
一文快速回顾 Java 操作数据库的方式-JDBC
前言 数据库的重要性不言而喻,不管是什么系统,什么应用软件,也不管它们是 Windows 上的应用程序,还是 Web 应用程序,存储(持久化)和查询(检索)数据都是核心的功能。 大…...
92年程序员发帖晒薪资称自己很迷茫,网友:老弟你可以了
当下,是一个“向钱看,向厚赚”的社会。快节奏的生活下,家庭、工作各方面压力很容易使年轻人陷入迷茫和焦虑。 与其他行业相比,程序员的高薪让人羡慕。那么,对于那些真正达到这么多收入的人来说,他们是怎么…...
太敢说了,编程如果这么自学,培训班都得倒闭,直接省去上万元的学费
写了20多年的代码,之前做过阿里的高级架构师,在技术这条路上跌跌撞撞了很多,我今天分享一些我个人的自学方法给各位。现在在网上报个正经点的班得花几千块钱,线下就更夸张,都是万元起步,我的这些学习方法如果你能用好&…...
别急着给中国版ChatGPT唱赞歌:“追风者”无缘“星辰大海”
文心一言发布十余天后,争论仍未有止歇的迹象。 有人给出了“拉垮”的评价,相比于多轮迭代的ChatGPT,文心一言在逻辑推理、多轮对话等方面的表现不尽如人意;也有人认为给文心一言值得肯定,原因是填补了中文互联网的空白…...
异常:Error和Exception
异常机制(Exception) 什么是异常 实际工作中,遇到的情况不可能是非常完美的。比如:你写的某个模块,用户输入不一定符合你的要求、你的程序要打开某个文件,这个文件可能不存在或者文件格式不对,…...
Python满屏表白代码
目录 前言 爱心界面 无限弹窗 前言 人生苦短,我用Python!又是新的一周啦,本期博主给大家带来了一个全新的作品:满屏表白代码,无限弹窗版!快快收藏起来送给她吧~ 爱心界面 def Heart(): roottk.Tk…...
Unity --- Transform类
1.一个很有意思的事实是Transform类不仅用来管理游戏物体的位置缩放旋转,还用来管理游戏物体的父物体与子物体之间的关系 当游戏物体A的trasnform类a是游戏物体B的transform类b的父类的话,游戏物体A就是游戏物体B的父物体 2.如何访问脚本当前挂载的游戏…...
ImportError: /usr/lib/x86_64-linux-gnu/libstdc++.so.6: version `GLIBCXX_3.4.29‘ not found
Bug描述 今天主要解决一个 Bug:libstdc.so.6: version GLIBCXX_3.4.29 not found 主要是和 libstc版本问题相关,找了很多方法,其他很多方法都是直接修改libstc.so的版本,但是直接修改这种可能被多个共享库依赖的库版本将会牵一发…...
Unity IL2CPP 游戏分析入门
一、目标 很多时候App加密本身并不难,难得是他用了一套新玩意,天生自带加密光环。例如PC时代的VB,直接ida的话,汇编代码能把你看懵。 但是要是搞明白了他的玩法,VB Decompiler一上,那妥妥的就是源码。 U…...
设置鼠标右键打开方式,添加IDEA的打开方式
一、问题描述 已下载IDEA,但是右键打开之前保存的项目文件,无法显示以IDEA方式打开。 二、解决步骤 1. 打开注册表 winR键输入regedit 2、查找路径为计算机\HKEY_LOCAL_MACHINE\SOFTWARE\Classes\Directory\shell (我找了半天没看到Class…...
手机(Android)刷NetHunter安装指南,无需ssh执行kali命令, NetHunter支持的无线网卡列表!
一、安装NetHunter 前提:确保手机已经root,已装上magisk。如果没有root,可用尝试magisk root 后执行此文 1、下载Nethunter:Get Kali | Kali Linux 然后push 到sdcard 里, 2、打开magisk,选择刚刚下好的…...
Maven和Eclipse联合开发
Maven和Eclipse联合开发 java list 对象个数 size java List 取第一个对象.get(0) baseCrmSpecialclient.get(0).getFxid() System.out.print 换行 System.out.print(item.getCode()"\r\n"); java for循环用法 https://blog.csdn.net/rank/list/total Java for-ea…...
宝塔面板部署node+vue项目注意事项
宝塔面板部署nodevue项目注意事项 宝塔连接云服务器 如果服务器上没有安装宝塔面板,需要先安装,安装流程如下: 从宝塔官网主页进去,点击下载安装,然后点击在线安装 输入服务器IP和密码在服务器上安装宝塔面板 等待一…...
MATLAB | R2023a更新了哪些好玩的东西
R2023a来啦!!废话不多说看看新版本有啥有趣的玩意和好玩的特性叭!!把绘图放最前面叭,有图的内容看的人多。。 1 区域填充 可以使用xregion及yregion进行区域填充啦!! x -10:0.25:10; y x.^…...
MySQL对表操作
目录 CRUD 增加(Create) 查询(Retrieve) 全列查询 指定列查询 查询字段为表达式 别名 去重:DISTINCT 排序:ORDER BY 条件查询:WHERE 逻辑运算符: 修改(Update) 删除&…...
Downie 4 4.6.12 MAC上最好的一款视频下载工具
Downie for Mac 简介 Downie是Mac下一个简单的下载管理器,可以让您快速将不同的视频网站上的视频下载并保存到电脑磁盘里然后使用您的默认媒体播放器观看它们。 Downie 4 Downie 4 for Mac Downie 4 for Mac软件特点 支持许多站点 -当前支持1000多个不同的站点&…...
vue 组件间通信方式
目录 1、props传递数据(父 → 子) 2、v-model(双向绑定) 3、.sync(双向绑定) 4、ref(使用 ref 属性获取子组件的实例或 DOM 元素) 5、$emit / v-on(子组件向父组件发…...
Python 数据结构和算法实用指南(三)
原文:zh.annas-archive.org/md5/66ae3d5970b9b38c5ad770b42fec806d 译者:飞龙 协议:CC BY-NC-SA 4.0 第七章:哈希和符号表 我们之前已经看过数组和列表,其中项目按顺序存储并通过索引号访问。索引号对计算机来说很有效…...
数组索引哈希表(Array Indexed Hash Table)
数组索引哈希表(Array Indexed Hash Table) 哈希表:散列表,通过建立key与value之间的映射关系,实现高效的元素查询。 哈希表,数组,链表的查询效率 数组: 查询效率:对于已知索引的位…...
React Flow浏览器默认事件失效问题解决
前情提要 React Flow可以使用滑轮来实现对于该部分区域的放大和缩小,并且自动拦截浏览器默认的滑轮和滑轮按键组合事件,如:Ctrl鼠标滑轮事件。那么这就导致在非该区域的地方使用了浏览器默认的滑轮事件且改变了原有页面的大小时,…...
【Node.js】Node.js的安装与配置
目录 1. 下载 Node.js 2. 安装 Node.js 3. 验证安装 4. 配置 npm 5. 环境变量配置(可选) 6. 验证配置 安装和配置 Node.js 主要涉及以下几个步骤: 1. 下载 Node.js 首先,你需要从 Node.js 官方网站(https://no…...
gazebo中vins-fusion在仿真小车上的部署
软件要求:Ubuntu 20.04 ros的noetic版本,我是在虚拟机vitrualbox上运行的 这几天在学ROS,跟着赵虚左老师过了一遍之后,感觉还是有很多不懂的地方,xtdrone上仿真跟着文档走了一遍,好像没学到什么东西&#…...