简述Apache RocketMQ
整体架构分析
基本流程
模块特性
发送消息流程原理分析
同步发送 sync
异步发送 async
直接发送 one-way
主从同步(HA)机制分析
消息投递
持久化机制
RocketMQ的RPC通信
RocketMQ中Remoting通信模块的具体实现
消息的协议涉及与编码解码
消息的通信方法和通信流程
Client发送请求消息的具体实现
Server端接收消息并进行处理的实现
使用Netty作为底层通信库的原因
RocketMQ中RPC通信的Netty多线程模型
Netty的Reactor多线程模型设计概念与简述
RocketMQ中RPC通信的1+N+M1+M2的Reactor多线程设计与实现
安装参数
启动相关
Q&A
整体架构分析
基本流程
RocketMQ是一个分布式开放消息中间件,底层基于队列模型来实现消息收发,包含了4个模块:
Namesrv: 存储当前集群所有Broker信息,以及Topic和Broker的关系。
Broker: 集群最核心模块,主要负责当前Topic消息存储,消费者的消费位点管理(消费进度)。
Producer: 消息生产者,每个生产者都有一个ID(编号),多个生产者实例可以共用同一个ID,同一个ID下所有实例组成一个生产者集群。
Consumer: 消息消费者,每个订阅者也有一个ID(编号),多个消费实例可以共用同一个ID,同一个ID下所有实例组成一个消费者集群。
工作流程描述:
1、启动Namesrv,启动后监听端口,等待Broker/Producer/Consumer连接上来,相当于一个路由控制中心。
2、Broker启动,跟所有的Namesrv保持长连接,定时发送心跳包。心跳包包含当前Broker信息(IP+端口等)以及存储所有topic信息。注册成功后,Namesrv集群中就有Topic和Broker的映射关系。
3、收发消息前,先创建Topic,创建Topic时需要指定topic要存储在哪些Broker上。也可以在发送消息时自动创建Topic。
4、Producer启动并发送消息,启动时先跟Namesrv集群中的其中一台建立长连接,并从Namesrv中获取当前发送的Topic存在哪些Broker上,然后跟对应的Broker建立长连接,直接向Broker发消息。
5、Consumer跟Producer类似,跟其中一台Namesrv建立长连接,获取当前订阅Topic存在哪些Broker,然后直接跟Broker建立连接通道,开始消费消息。
模块特性
- Namesrv
Namesrv用户存储Topic、Broker关系信息,功能简单,稳定性高。多个Namesrv之间没有通信,所以Broker往往将多个Namesrv的地址都配置上。Namesarv的压力不会太大,主要的开销在于维持心跳和提供Topic-Broker的关系数据。Broker向Namesrv发送心跳时,会带上自己负责的topic信息,如果Topic个数太多(万级别),导致一次心跳的数据就有可能几十M,容易造成Namesrv误以为Broker心跳失败。
- Broker
- 高并发读写服务
Broker的高并发主要依靠两点:消息顺序写,所有Topic数据只会写一个文件,一个文件满1G,再写新文件,真正的顺序写盘,TPS大幅度提高;消息随机读,RocketMQ尽可能的让读命中系统pagecache。
- 负载均衡和动态伸缩
负载均衡:Broker上存Topic信息,Topic由多个队列组成,队列会平均分散在多个Broker上,而Producer的发送机制保证消息尽量平均分布到所有队列中,最终效果就是所有消息都平均落在每个Broker上。
动态伸缩能力:Broker的伸缩性提现在两个维度:Topic,Broker
Topic维度:假如一个Topic的消息量特别大,但是集群水位压力还是很低,就可以扩大该Topic的队列数,Topic的队列数跟发送、消费速度成正比。
Broker维度:如果集群水位很高了,需要扩容,直接加机器部署Broker即可。
- 高可用和高可靠
高可用:集群部署时候一般为主备,备机从主机同步消息,如果其中一个主机宕机,备机提供消费服务,但不提供写服务。
高可靠:所有往Broker发的消息,有同步刷盘和异步刷盘;同步刷盘时,消息写入物理文件才会返回成功,异步刷盘时,只有机器宕机,才会产生消息丢失。
- Broker与Namesrv的心跳机制
单个Broker跟所有的Namesrv保持心跳,间隔为30秒,心跳请求包含了当前Broker的所有Topic信息。Namesrv同样会反查Broker的心跳,如果某个Borker在两分钟之内都没有心跳,则任务该Broker下线,调整Topic跟Broker的对应关系。
- Consumer
消费者启动时候指定了Namesrv的地址,与其中一个保持长连接。消费者每个30秒从Namesrv获取所有topic的最新队列信息。连接建立后,从Namesrv获取当前消费Topic所涉及的Broker,直连Broker。
Consumer同样跟Broker保持长连接,会每隔30s发心跳信息到Broker,Broker每10s检查一次当前存活消费者,如果发现2分钟没有心跳,就断开与该消费者的链接,并且向消费组的其他实例发送通知,触发消费集群的负载均衡。
消费者的消费方式有两种:集群消费、广播消费。
- Producer
生产启动后同样选择一台Namesrv进行长连接。同样从Namesrv获取Topic和Broker的信息。并发发送心跳。生产者发送消息时,会轮询当前可发送的Broker,一次发送成功后,下次发送到另外一个,已达到负载均衡。
发送消息流程原理分析
从功能上说RocketMQ支持三种发送消息的方式,分别死同步发送、异步发送和直接发送。
同步发送 sync
只有在消息完全发送完成后才返回结果,此方式需要同步等待发送结果的时间代价。
有内部重试机制,即在主动声明本次消息发送失败之前,内部实现将重试一定次数,默认为2次。发送的结果存在同一个消息可能被多次发送给Broker,需要开发者在消费端处理幂等性问题。
异步发送 async
Message msg = new Message("topic", "tag", ("Hello world").getBytes()); producer.send(msg, new SendCallBack() { // callback method after send success public void onSuccess(SendResult sendResult) { System.out.println("print send result:" + sendResult); } // callback method after send failure public void onException(Throwable a) { System.out.println("get a exception:" + e); } }))
异步发送模式,消息发送后立刻返回,当消息完全完成发送后,会调用回调函数sendCallback来告知发送者本次发送成功或失败。异步模式通常用于响应时间敏感的业务 。同样存在重试机制和发送同一消息的情况,需要在消费端处理幂等性问题。
直接发送 one-way
producer.sendOneWay(msg);
采用one-way发送模式发送消息的时候,发送端发送完消息后会立刻返回,不会等待来自Broker的ack来告知本次消息发送是否完全完成发送。此方式的吞吐量很大,但是存在消息丢失的风险,适合不重要的消息发送,比如日志。
主从同步(HA)机制分析
基本流程图如下:
RocketMQ的主从同步机制如下:
1、首先启动Master并在指定端口监听;
2、客户单启动,主动连接Master,建立TCP连接;
3、客户端以5秒的间隔时间向服务器拉取消息,如果第一次拉取的话,先获取本地commitlog文件中最大的偏移量,以该偏移量向服务端拉取消息;
4、服务端解析请求,并返回一批数据给客户端。
5、客户端收到一批消息后,将消息写入到本地commitlog文件中,然后向Master汇报拉取进度,并更新下一次待拉取偏移量;
6、重复第3步。
RocketMQ主从同步一个重要的特征:主从同步不具备主从切换功能,即当主节点宕机后,从不会接管消息发送,但是可以提供消息读取。
默认情况下,消息的去读都是主服务器,主从同步引入的主要目的是消息堆积的内容默认超过内存的40%,则消息读取由从服务器来接管,实现消息的读写分离,避免主服务的IO抖动严重。
HA消息消费进度机制:
1、消费者在反馈消息消费进度时会优先选择主服务器,此时主服务器的消费进度立马更新了,从服务器此时只需定时同步主服务器的消息消费进度即可。
2、消费者在向从服务器拉取消息时,如果是从服务器,在处理消息拉取时,也会更新消息消费进度。
消息投递
RocketMQ的消息模型如下:
一个Topic可能对应多个实际的消息队列(Message Queue)
在底层实现上,为了提供MQ的可用性和灵活性,一个Topic在实际存储的过程中,采用了多队列的方式,具体如上图所示,每个消息队列在使用中应当保证先入先出(FIFO)的方式进行消费。
- 默认情况下,采用最简单的轮询算法,这个算法可以保证每一个Queue队列的消息投递数量尽可能均匀。
- 对于默认情况下的增强方式:在某些Queue队列可能由于自身数量积压的原因,投递的过程比较长,对于这样的Queue队列会影响后续投递的效果。基于这样的情况,RocketMQ每发送一个消息后,会统计一下消息投递的时间延迟,根据这个延迟,知道哪些Queue队列投递的速度快。这样情况下优先使用消息投递延迟最小的策略,如果没有生效,再使用轮询算法。
- 在某些特定场景下,需要消息投递和消费的顺序性。对于相同订单号的消息,通过一定策略,将其放置在一个Queue队列中,然后消费者再采用一定策略(一个线程独立处理一个queue),就能保证消费的顺序性。
延迟投递/批量投递均可。
消息事务,Half Msg需要Producer确定commint或者rollback.
持久化机制
RocketMQ是一款高性能、高可靠的分布式消息中间件,要保证高可靠,数据就必须持久化到磁盘上,将数据持久化到磁盘上,那么可能不能保证高性能。一般磁盘的性能在顺序读写的情况下速度可以达到450MB/S-600MB/S,但是在随机读写的情况下,速度可能只有几百KB/S。RocketMQ在持久化的设计上,采取的是消息顺序写,随机读的策略,利用磁盘顺序写的速度,让磁盘的写速度不会成为系统的瓶颈。并且采用MMap这种“零拷贝”技术,提高消息存盘和网络发送的速度。极力满足RocketMQ的高性能、高可靠要求。
RocketMQ的持久化机制的架构图:
CommitLog: 消息真正的存储文件,所有消息都存储在CommitLog中。
ConsumeQueue:消息消费逻辑队列,类似数据库的索引文件。
IndexFile:消息索引文件,主要存储消息Key与Offset对应关系,提升消息检索速度。
Broker将消息顺序写入到CommitLog中,这也就是RocketMQ高性能的原因。但是消费者消费消息的时候往往只关心订阅主题下的所有消息,但是同一主题的消息在CommitLog文件中可能是不连续的,那么消费者消费消息的时候,需要将CommitLog文件加载到内存中遍历查找订阅主题下的消息,频繁的IO操作,性能就会急剧下降。
为解决该问题,RocketMQ引入了Consumequeue文件,可以看做是索引文件,类似于MySQL的二级索引。在存放了同一主题下的所有消息,消费者消费的时候只需要去对应的Consumequeue组中取消息即可。Consumequeue不负责存储消息,只负责记录它所属Topic的消息的CommitLog中的偏移量,这样消费者根据偏移量定位到消息,Consumequeue文件不会存储消息的全量信息。
RocketMQ的RPC通信
rocketmq-remoting模块是RocketMQ消息队列中负责网络通信的模块,它几乎被其他所有需要网络通信的模块(rocketmq-client, rocketmq-broker, rocketmq-namesrv)所依赖和引用,为了实现客户端与服务端之间高效的数据情况与接收,RocketMQ消息队列自定义了通信信息并在Netty的基础之上扩展了通信模块。
RocketMQ中Remoting通信模块的具体实现
1、RemotingService:最上层的接口:
public interface RemotingService { void start(); void shutdown(); void registerRPCHook(RPCHook rpcHook); }
2、RemotingClient/RemotingServer:两个接口集成了最上层接口RemotingService,分别各自为Client和Server提供锁必须的方法
3、NettyRemotingAbstract:Netty通信处理的抽象类,定义并封装了Netty处理的公共处理方法
4、NettyRemotingClient/NettyRemotingServer:分别实现了RemotingClient和RemotingServer,都继承了NettyRemotingAbstract抽象类。RocketMQ中其他的组件(如client, nameserver, broker在进行消息的发送和接收时均使用这两个组件)。
消息的协议涉及与编码解码
在Client和Server之间完成一次消息发送时,需要对发送的消息进行一个协议约定,因此就有必要自定义RocketMQ的消息协议。同时,为了高效地在网络中传输消息和对收到的消息读取,就需要对消息进行编解码。在RocketMQ中,RemotingCommand这个类在消息传输过程中对所有数据内容的封装,不但包含了所有的数据结构,还包含了编码解码操作。
RemotingCommand类的部分成员变量如下:
Header字段 | 类型 | Request说明 | Response说明 |
code | int | 请求操作码,应答方根据不同的请求码进行不同的业务处理 | 应答响应码。0表示成功,非0则表示各种错误 |
language | LanguageCode | 请求方实现的语言 | 应答方实现的语言 |
version | int | 请求方程序的版本 | 应答方程序的版本 |
opaque | int | 相当于requestId,在同一个连接上的不同请求标识码,与响应消息中的相对应 | 应答不做修改直接返回 |
flag | int | 区分是普通RPC还是onewayRPC的标志 | 区分是普通RPC还是onewayRPC的标志 |
remark | String | 传输自定义文本信息 | 传输自定义文本信息 |
extFields | HashMap | 请求自定义扩展信息 | 响应自定义扩展信息 |
例如Broker向NameServer发送一次心跳注册的报文:
code=103,//这里的103对应的code就是broker向nameserver注册自己的消息 language=JAVA, version=137, opaque=58,//这个就是requestId flag(B)=0, remark=null, extFields={ brokerId=0, clusterName=DefaultCluster, brokerAddr=ip1: 10911, haServerAddr=ip1: 10912, brokerName=LAPTOP-SMF2CKDN }, serializeTypeCurrentRPC=JSON
RocketMQ通信协议的格式如下:
1、消息长度:总长度、四个字节存储、占用一个int类型
2、序列化类型&消息头长度:同样占用一个int类型,第一个字节表示序列化类型,后面三个字节表示消息头长度
3、消息头数据:经过序列化后的消息头数据
4、消息主体数据:消息主体的二进制字节数据内容
消息的编码和解码分别在RemotingCommand类的encode和decode方法中完成。
消息的通信方法和通信流程
RocketMQ消息队列中支持通信的方式如下:同步、异步、单向
同步通信模式相对简单,一般用在发送心跳包场景下,无需关注其Response。
Client发送请求消息的具体实现
当客户端调用异步通信接口invokeAsync时候,先由RemotingClient的实现类-NettyRemotingClient根据addr获取相应的channel(如果本地缓存中没有则创建),随后调用invokeAsyncImpl方法,将数据流转给抽象类NettyRemotingAbstract处理(真正做完发送请求动作是在NettyRemotingAbstract抽象类的invokeAsyncImpl方法里面)。
/*** invokeAsync(异步调用)* * @param channel* @param request* @param timeoutMillis* @param invokeCallback* @throws InterruptedException* @throws RemotingTooMuchRequestException* @throws RemotingTimeoutException* @throws RemotingSendRequestException*/
public void invokeAsyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis,final InvokeCallback invokeCallback)throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {//相当于request ID, RemotingCommand会为每一个request产生一个request ID, 从0开始, 每次加1final int opaque = request.getOpaque();boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);if (acquired) {final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync);//根据request ID构建ResponseFuturefinal ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, invokeCallback, once);//将ResponseFuture放入responseTablethis.responseTable.put(opaque, responseFuture);try {//使用Netty的channel发送请求数据channel.writeAndFlush(request).addListener(new ChannelFutureListener() {//消息发送后执行@Overridepublic void operationComplete(ChannelFuture f) throws Exception {if (f.isSuccess()) {//如果发送消息成功给Server,那么这里直接Set后returnresponseFuture.setSendRequestOK(true);return;} else {responseFuture.setSendRequestOK(false);}responseFuture.putResponse(null);responseTable.remove(opaque);try {//执行回调executeInvokeCallback(responseFuture);} catch (Throwable e) {log.warn("excute callback in writeAndFlush addListener, and callback throw", e);} finally {//释放信号量responseFuture.release();}log.warn("send a request command to channel <{}> failed.", RemotingHelper.parseChannelRemoteAddr(channel));}});} catch (Exception e) {//异常处理responseFuture.release();log.warn("send a request command to channel <" + RemotingHelper.parseChannelRemoteAddr(channel) + "> Exception", e);throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);}} else {if (timeoutMillis <= 0) {throw new RemotingTooMuchRequestException("invokeAsyncImpl invoke too fast");} else {String info =String.format("invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d",timeoutMillis,this.semaphoreAsync.getQueueLength(),this.semaphoreAsync.availablePermits());log.warn(info);throw new RemotingTimeoutException(info);}}
}
在Client端发送请求消息时有个比较重要的数据结构:
protected final ConcurrentHashMap<Integer /* opaque */, ResponseFuture> responseTable
opaque表示请求发起方在多个连接上不同的请求标识代码,每次发送一个消息的时候,可以选择同步阻塞/异步非阻塞的方法,无论哪种通信方式,都会保存请求操作码至ResponseFuture的Map映射-responseTable中。
ResponseFuture保存返回响应
public ResponseFuture(int opaque, long timeoutMillis, InvokeCallback invokeCallback, SemaphoreReleaseOnlyOnce once) { this.opaque = opaque; this.timeoutMillis = timeoutMillis; this.invokeCallback = invokeCallback; this.once = once; }
对于同步通信来说,第三、四个参数为null,对于异步通信来说,invokeCallback是在收到消息响应的时候能够根据responseTable找到请求码对应的回调执行方法,semaphore参数用作流控,当多个流程同时往一个连接写数据时可以通过信号量控制授权同时写许可的数量。
异常发送流程处理-定时扫描responseTable本地缓存
在发送消息时候,如果遇到异常情况(比如服务端没有response返回给客户端或者response因网络而丢失),上面所述的responseTable的本地缓存Map将出现堆积情况,这个时候需要一个定时任务来专门做responseTable的清理回收。在RocketMQ的客户端/服务端启动时候会产生频率为1s调用一次的定时任务检查所有的responseTable缓存中的responseFuture变量,判断是否已经得到返回,并进行相应的处理。
Server端接收消息并进行处理的实现
Server端接收消息的处理入口在NettyServerHandler类的channelRead0方法中,其中调用了processMessageReceived方法,其中服务端最为重要的处理方法实现如下:
public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {//根据RemotingCommand中的code获取processor和ExecutorServicefinal Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;final int opaque = cmd.getOpaque();if (pair != null) {Runnable run = new Runnable() {@Overridepublic void run() {try {//rpc hookRPCHook rpcHook = NettyRemotingAbstract.this.getRPCHook();if (rpcHook != null) {rpcHook.doBeforeRequest(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);}//processor处理请求final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd);//rpc hookif (rpcHook != null) {rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);}if (!cmd.isOnewayRPC()) {if (response != null) {response.setOpaque(opaque);response.markResponseType();try {ctx.writeAndFlush(response);} catch (Throwable e) {PLOG.error("process request over, but response failed", e);PLOG.error(cmd.toString());PLOG.error(response.toString());}} else {}}} catch (Throwable e) {if (!"com.aliyun.openservices.ons.api.impl.authority.exception.AuthenticationException".equals(e.getClass().getCanonicalName())) {PLOG.error("process request exception", e);PLOG.error(cmd.toString());}if (!cmd.isOnewayRPC()) {final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR, //RemotingHelper.exceptionSimpleDesc(e));response.setOpaque(opaque);ctx.writeAndFlush(response);}}}};if (pair.getObject1().rejectRequest()) {final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,"[REJECTREQUEST]system busy, start flow control for a while");response.setOpaque(opaque);ctx.writeAndFlush(response);return;}try {//封装requestTaskfinal RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);//想线程池提交requestTaskpair.getObject2().submit(requestTask);} catch (RejectedExecutionException e) {if ((System.currentTimeMillis() % 10000) == 0) {PLOG.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) //+ ", too many requests and system thread pool busy, RejectedExecutionException " //+ pair.getObject2().toString() //+ " request code: " + cmd.getCode());}if (!cmd.isOnewayRPC()) {final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,"[OVERLOAD]system busy, start flow control for a while");response.setOpaque(opaque);ctx.writeAndFlush(response);}}} else {String error = " request type " + cmd.getCode() + " not supported";//构建responsefinal RemotingCommand response =RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);response.setOpaque(opaque);ctx.writeAndFlush(response);PLOG.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error);}
}
上面的请求处理方法中根据RemotingCommand的请求业务码来匹配到相应的业务处理器,然后生成一个新的线程提交至对应的业务线程池进行异步处理。
processorTable - 请求业务码与业务处理、业务线程池的映射变量
protected final HashMap<Integer/* request code */, Pair<NettyRequestProcessor, ExecutorService>> processorTable = new HashMap<Integer, Pair<NettyRequestProcessor, ExecutorService>>(64);
使用Netty作为底层通信库的原因
- Netty的编程API使用简单,开发门槛低,无需开发人员去关注和了解太多的NIO编程模型和概念。
- 可根据业务的要求进行定制化开发,通过Netty的ChannelHandler对通信框架进行灵活的定制化扩展
- Netty框架本身支持拆包、解包,异常检测等机制,让开发人员从JavaNIO的细节中解脱,只需要关于业务的本身。
- Netty解决了JDK NIO的Bug(Epoll的BUG,会导致Selector空轮询,最终导致CPU100%)
- Netty框架内部对线程,Selector做了一些细节的优化,精心设计的Reactor多线程模型可以实现非常高效的并发处理
- Netty已经在多个项目(Hadoop的RPC框架avro使用Netty作为通信框架)中都得到过充分验证。
RocketMQ中RPC通信的Netty多线程模型
RocketMQ的RPC通信部分采用了“1+N+M1+M2”的Reactor多线程模型,对网络通信部分进行一定的扩展与优化。
Netty的Reactor多线程模型设计概念与简述
Reactor多线程模型的设计思路是分而治之和事件驱动:
分而治之:一个网络请求连接的完整处理过程可以分为接受(accept),数据读取(read),解码/编码(decode/encode),业务处理(process),发送响应(send)这几个步骤。Reactor模型将每个步骤都映射为一个任务,服务端线程执行的最小逻辑单元不再是一次完成的网络请求,而是这个任务,且采用以非阻塞方式执行。
事件驱动:每个任务对应特定网络事件,当任务准备就绪时,Reactor收到对应的网络事件通知,并将任务分发给绑定了对应网络事件的Handler执行。
RocketMQ中RPC通信的1+N+M1+M2的Reactor多线程设计与实现
RocketMQ中RPC通信的Reactor多线程设计与流程
RocketMQ的RPC通信采用Netty组件作为底层通信库,同样也遵循了Reactor多线程模型,同时又在这之上做了一些扩展和优化。下面先给出一张RocketMQ的RPC通信层的Netty多线程模型框架图。
上图的框架图中可以大致了解RocketMQ中的NettyRemotingServer的Reactor多线程模型。一个Reactor主线程(eventLoopGroupBoss,即为上面的1)负责监听TCP网络连接请求,建立好连接后丢给Reactor线程池(eventLoopGroupSelector,即为上面的“N”,源码中默认为3),它负责将建立好连接的socket注册到Selector上去(RocketMQ的源码中会自动根据OS的类型选择NIO和Epoll,也可以通过参数配置),然后监听真正的网络数据,拿到网络数据后,再丢给Worker线程池(defaultEventExecutorGroup,即为上面的“M1”,源码中默认为8)。
为了更为高效的处理RPC的网络请求,这里的Worker线程池是专门用于处理Netty网络通信相关的(包括编码/解码,空闲链接管理、网络连接管理以及网络请求处理)。而处理业务操作放在业务线程中执行,根据RemotingCommand的业务请求码code去processorTable这个本地缓存变量中找到对应的processor,然后封装成task任务后,提交给对应的业务processor处理线程池来执行(sendMessageExecutor,以发送消息为例,即为上面的“M2”)。
下面以表格的方式列举了下上面所述的“1+N+M1+M2”Reactor多线程模型
线程池 | 线程名 | 线程具体说明 |
1 | NettyBoss_%d | Reactor主线程 |
N | NettyServerEPOLLSelector_%d_%d | Reactor线程池 |
M1 | NettyServerCodecThread_%d | Worker线程池 |
M2 | RemotingExecutorThread_%d | 业务processor处理线程池 |
RocketMQ中RPC通信的Reactor多线程的代码具体实现
在NettyRemotingServer的实例初始化时,会初始化各个相关的变量包括serverBootstrap、nettyServerConfig参数、channelEventListener监听器并同时初始化eventLoopGroupBoss和eventLoopGroupSelector两个Netty的EventLoopGroup线程池(如果Linux平台,并且开始了native epoll,就用EpollEventLoopGroup,这个也就是用JNI,调的c写的epoll,否则,就用Java NIO的NioEventLoopGroup)。
public NettyRemotingServer(final NettyServerConfig nettyServerConfig,final ChannelEventListener channelEventListener) {super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue());this.serverBootstrap = new ServerBootstrap();this.nettyServerConfig = nettyServerConfig;this.channelEventListener = channelEventListener;//省略部分代码//初始化时候nThreads设置为1,说明RemotingServer端的Disptacher链接管理和分发请求的线程为1,用于接收客户端的TCP连接this.eventLoopGroupBoss = new NioEventLoopGroup(1, new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, String.format("NettyBoss_%d", this.threadIndex.incrementAndGet()));}});/*** 根据配置设置NIO还是Epoll来作为Selector线程池* 如果是Linux平台,并且开启了native epoll,就用EpollEventLoopGroup,这个也就是用JNI,调的c写的epoll;否则,就用Java NIO的NioEventLoopGroup。* */if (useEpoll()) {this.eventLoopGroupSelector = new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);private int threadTotal = nettyServerConfig.getServerSelectorThreads();@Overridepublic Thread newThread(Runnable r) {return new Thread(r, String.format("NettyServerEPOLLSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));}});} else {this.eventLoopGroupSelector = new NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);private int threadTotal = nettyServerConfig.getServerSelectorThreads();@Overridepublic Thread newThread(Runnable r) {return new Thread(r, String.format("NettyServerNIOSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));}});}//省略部分代码
在NettyRemotingServer实例初始化完成后,就会将其启动。Server端在启动阶段会将之前实例化好的1个acceptor线程(eventLoopGroupBoss),N个IO线程(eventLoopGroupSelector),M1个Worker线程(defaultEventExecutorGroup)绑定上去。
Worker线程拿到网络数据后,就交给Netty的ChannelPipeline(其采用责任链设计模式),从Head到Tail的一个个Handler执行下去,这些Handler是在创建NettyRemotingServer实例时候指定的。NettyEncoder和NettyDecoder负责网络传输数据和RemotingCommand之间的编解码。NettyServerHandler拿到的解码得到的RemotingCommand后,根据RemotingCommand.type来判断是request还是response来进行相应处理,根据业务请求码封装成不同的task任务后,提交给对应的业务processor处理线程池处理。
@Override
public void start() {//默认的处理线程池组,使用默认的处理线程池组用于处理后面的多个Netty Handler的逻辑操作this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(nettyServerConfig.getServerWorkerThreads(),new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());}});/*** 首先来看下 RocketMQ NettyServer 的 Reactor 线程模型,* 一个 Reactor 主线程负责监听 TCP 连接请求;* 建立好连接后丢给 Reactor 线程池,它负责将建立好连接的 socket 注册到 selector* 上去(这里有两种方式,NIO和Epoll,可配置),然后监听真正的网络数据;* 拿到网络数据后,再丢给 Worker 线程池;**///RocketMQ-> Java NIO的1+N+M模型:1个acceptor线程,N个IO线程,M1个worker 线程。ServerBootstrap childHandler =this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector).channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 1024)//服务端处理客户端连接请求是顺序处理的,所以同一时间只能处理一个客户端连接,多个客户端来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理,backlog参数指定了队列的大小.option(ChannelOption.SO_REUSEADDR, true)//这个参数表示允许重复使用本地地址和端口.option(ChannelOption.SO_KEEPALIVE, false)//当设置该选项以后,如果在两小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文。.childOption(ChannelOption.TCP_NODELAY, true)//该参数的作用就是禁止使用Nagle算法,使用于小数据即时传输.childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())//这两个参数用于操作接收缓冲区和发送缓冲区.childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize()).localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort())).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME,new HandshakeHandler(TlsSystemConfig.tlsMode)).addLast(defaultEventExecutorGroup,new NettyEncoder(),//rocketmq解码器,他们分别覆盖了父类的encode和decode方法new NettyDecoder(),//rocketmq编码器new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),//Netty自带的心跳管理器new NettyConnectManageHandler(),//连接管理器,他负责捕获新连接、连接断开、异常等事件,然后统一调度到NettyEventExecuter处理器处理。new NettyServerHandler()//当一个消息经过前面的解码等步骤后,然后调度到channelRead0方法,然后根据消息类型进行分发 );}});if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);}try {ChannelFuture sync = this.serverBootstrap.bind().sync();InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();this.port = addr.getPort();} catch (InterruptedException e1) {throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);}if (this.channelEventListener != null) {this.nettyEventExecutor.start();}//定时扫描responseTable,获取返回结果,并且处理超时this.timer.scheduleAtFixedRate(new TimerTask() {@Overridepublic void run() {try {NettyRemotingServer.this.scanResponseTable();} catch (Throwable e) {log.error("scanResponseTable exception", e);}}}, 1000 * 3, 1000);
}
从上面的描述中可以概括得出RocketMQ的RPC通信部分的Reactor线程池模型框图。
整体可以看出RocketMQ的RPC通信借助Netty的多线程模型,其服务端监听线程和IO线程分离,同时将RPC通信层的业务逻辑与处理具体业务的线程进一步相分离。时间可控的简单业务都直接放在RPC通信部分来完成,复杂和时间不可控的业务提交至后端业务线程池中处理,这样提高了通信效率和MQ整体的性能。(ps:其中抽象出NioEventLoop来表示一个不断循环执行处理任务的线程,每个NioEventLoop有一个Selector,用于监听绑定在其上的socket链路)。
安装参数
在安装过程中,先启动namesrv,后启动broker,broker中配置几个关键要素为:
- brokerClusterName:集群名称,所有broker配置的必须一致
- brokerName:broker名称,M-S之间必须一致,比如broker-a/broker-a(M/S),broker-b/broker-b
- brokerId:0表示Master,>0表示slave,比如1M2S,那么M是0,Slave是1,Slave2配置时2
- namesrvAddr:namesrv地址
- store*:持久化数据存储目录
- brokerRole:Broker的角色
- flushDiskType:刷盘方式,2m2s-sync中,master默认为SYNC_FLUSH,默认为ASYNC_FLUSH
启动相关
启动脚本./bin/mqbroker,实际作用是使用java命令运行org.apache.rocketmq.broker.BrokerStartup类。即broker的入口类是org.apache.rocketmq.broker.BrokerStartup。
如何保证消息的可靠性传输?要是消息丢失了怎么办?
生产阶段、存储阶段、消费阶段三个方面来讨论。
最终一致性:RocketMQ是一种最终一致性的分布式事务,就是说他保证的是消息最终一致性,而不是像2PC,3PC,TCC那样的强一致性分布式事务。
半消息:指暂时不能被Consumer消费的消息,Producer已经把消息成功发送到Broker端,但是此消息被标记暂不能投递,处于该种状态下的消息为半消息。需要Producer对消息的二次确认后,Consumer才能去消费他
消息回查:由于网络闪断、生产者应用重启等原因,导致Producer端一直没有对Half Message进行二次确认,Broker服务器会定时扫描长期处于半消息的信息,会主动询问Producer端该消息的最终状态(Commit或者Rollback),该消息为消息回查。
对于生产者来说,可以向多个master的broker去发送消息,同时可以发送同步消息和异步消息返回服务方消息的应答,保证消息是否发送成功,对于消息,有同步刷盘和异步刷盘机制,主从之间也有同步复制和异步复制,保证了消息不丢失。同时我们也可以把消息记录的日志文件或者表中,RocketMQ消息的存储是有ConsumerQueue和ConmmitLog配合完成,消息真正的物理存储文件是CommitLog,ConsumerQueue是消息的逻辑队列,类似于数据库的索引文件,存储的是指向物理存储的地址。每个Topic下的每个Message Queue都有一个对应的ConsumerQueue文件,通过刷盘和复制机制来保证数据的高可用。
对于消费者来说,消费者既可以消费broker的master,有可以消费broker的slave,当master宕机之后,会自动切换的slave进行消费。对于广播的消息来说,我们可以进行消息的重试,消息队列RocketMQ默认允许每条消息最多重试16次,一条消息无论重试多少次,这些重试消息的MessageID不会改变。
当一条消息初次消费失败,消息队列RocketMQ会自动进行消息重试,达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确消费该消息,此时,消息队列RocketMQ不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中。
Broker端保证消息可靠性,从架构层次来说明,常见的架构策略:
双主双从架构、NameServer多节点,同步双写,异步刷盘,消息在内存中,突然断电消息丢失,同步刷盘可靠性更高,消息持续化到磁盘,同城双活,异地多活,跨国多活。
Q&A
如何进行消息的重试机制?
Producer通过网络发送消息给Broker,当Broker收到之后,会返回确认响应信息给Producer,所以只要生产者接收到返回的确认响应,就代表消息在生产阶段未丢失。
消费端消费消息后,需要给Broker返回消费状态。Consumer端的重试机制包括两种情况:异常重试,由于Consumer端逻辑出现了异常,导致了返回了RECONSUMER_LATER状态,那么Broker就会在一段时间后重试;超时重试:如果Consumer端处理时间很长,或者由于某些原因线程挂起,导致迟迟没有返回消费状态,Broker就会认为消费超时,发起超时重试。
消息模式为MessageModel.CLUSTERING集群模式下,Broker才会自动进行重试,广播消息时不会重试的。
并且这样的重试机制难免会出现消息重复消费的情况,所以在消费端需要做好幂等性。
如何保证消息不被重复消费?或者说如何保证消息消费时的幂等性?
在互联网应用中,尤其在网络不稳定的情况下,消息队列 RocketMQ 的消息有可能会出现重复,这个重复简单可以概括为以下情况:
发送时消息重复
当一条消息已被成功发送到服务端并完成持久化,此时出现了网络闪断或者客户端宕机,导致服务端对客户端应答失败。 如果此时生产者意识到消息发送失败并尝试再次发送消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。
投递时消息重复
消息消费的场景下,消息已投递到消费者并完成业务处理,当客户端给服务端反馈应答的时候网络闪断。 为了保证消息至少被消费一次,消息队列 RocketMQ 的服务端将在网络恢复后再次尝试投递之前已被处理过的消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。
负载均衡时消息重复(包括但不限于网络抖动、Broker 重启以及订阅方应用重启)
当消息队列 RocketMQ 的 Broker 或客户端重启、扩容或缩容时,会触发 Rebalance,此时消费者可能会收到重复消息。
因为 Message ID 有可能出现冲突(重复)的情况,所以真正安全的幂等处理,不建议以 Message ID 作为处理依据。 最好的方式是以业务唯一标识作为幂等处理的关键依据,而业务的唯一标识可以通过消息 Key 进行设置
相关文章:
简述Apache RocketMQ
整体架构分析 基本流程 模块特性 发送消息流程原理分析 同步发送 sync 异步发送 async 直接发送 one-way 主从同步(HA)机制分析 消息投递 持久化机制 RocketMQ的RPC通信 RocketMQ中Remoting通信模块的具体实现 消息的协议涉及与编码解码 消…...
AI融合SEO关键词实战指南
内容概要 随着人工智能技术的迭代升级,SEO关键词策略正经历从人工经验驱动向数据智能驱动的范式转变。本指南聚焦AI技术在搜索引擎优化中的系统性应用,通过构建多层技术框架实现关键词全生命周期管理。核心方法论涵盖语义分析引擎的构建原理、基于NLP的…...
RK3588 实现音视频对讲
RK3588 实现音视频对讲方案 RK3588是瑞芯微推出的一款高性能处理器,非常适合用于音视频对讲系统的开发。以下是基于RK3588实现音视频对讲的方案概述: 硬件架构 核心处理器:RK3588 (4xCortex-A76 4xCortex-A55)视频处理: 内置8…...
OSPF区域间路由计算
ABR:区域边界路由器,连接两个不同区域的设备就称为ABR(不同厂商不同,定义很模糊) ASBR:自治系统边界路由器,引入了外部路由,将不是自治系统外部的不是OSPF路由的条目变成OSPF路由条目…...
NAT、代理服务、内网穿透
NAT、代理服务、内网穿透 1、NAT1.1、NAT过程1.2、NAPT2、内网穿透3、内网打洞3、代理服务器3.1、正向代理3.2、反向代理1、NAT 1.1、NAT过程 之前我们讨论了IPv4协议中IP地址数量不充足的问题。NAT技术是当前解决IP地址不够用的主要手段,是路由器的一个重要功能。 NAT能够将…...
阿尔特拉 EP1C12F324I7N AlteraFPGA Cyclone
EP1C12F324I7N 属于 Altera Cyclone I 系列 FPGA 中的中低密度型号,面向成本敏感、功耗受限的嵌入式与数据通路应用。该器件采用 0.13 μm 全层铜 SRAM 工艺,集成约 12 060 个逻辑单元(LE)、239 616 位片上 RAM、249 路可编…...
解决“驱动程序无法通过使用安全套接字层(SSL)加密与 SQL Server 建立安全连接“问题
参考链接: https://blog.csdn.net/yyj12138/article/details/123073146...
QtApplets-实现应用程序单例模式,防止重复运行
QtApplets-实现应用程序单例模式,防止重复运行 文章目录 QtApplets-实现应用程序单例模式,防止重复运行摘要引言实现原理核心代码实现头文件定义实现文件 使用方法技术要点解析1. 文件锁机制2. 进程 ID 管理3. Windows 互斥量4. 跨平台兼容 注意事项…...
nodejs使用pkg打包文件
pkg配置 "pkg": {"assets": ["*.html","*.css","*.js"],"mirror": "https://npmmirror.com/mirrors/node-v8-compile-cache/"},"bin": "server.js",嵌入到exe中的资源使用assets打…...
学习笔记十六——Rust Monad从头学
🧠 零基础也能懂的 Rust Monad:逐步拆解 三大定律通俗讲解 实战技巧 📣 第一部分:Monad 是什么? Monad 是一种“包值 链操作 保持结构”的代码模式,用来处理带上下文的值,并方便连续处理。 …...
Idea连接远程云服务器上的MySQL,开放云服务器端口
1.开放云服务器的3306端口 (1)进入到云服务器的控制台 (2)点击使用的云服务器 (3)点击 配置安全组规则 (4)添加规则 (5)开放端口 2.创建可以远程访问…...
云服务器CVM标准型S5实例性能测评——2025腾讯云
腾讯云服务器CVM标准型S5实例具有稳定的计算性能,CPU采用采用 Intel Xeon Cascade Lake 或者 Intel Xeon Cooper Lake 处理器,主频2.5GHz,睿频3.1GHz,CPU内存配置2核2G、2核4G、4核8G、8核16G等配置,公网带宽可选1M、3…...
【Pytorch之一】--torch.stack()方法详解
torch.stack方法详解 pytorch官网注释 Parameters tensors:张量序列,也就是要进行stack操作的对象们,可以有很多个张量。 dim:按照dim的方式对这些张量进行stack操作,也就是你要按照哪种堆叠方式对张量进行堆叠。dim的…...
监控+日志=DevOps 运维的“千里眼”与“顺风耳”
监控+日志=DevOps 运维的“千里眼”与“顺风耳” 在 DevOps 体系中,监控和日志管理是不可或缺的运维基石。有人说,开发只管把代码写好,运维才是真正的“操盘手”,让系统稳定运行、不宕机、不崩溃。而要做到这一点,精准的监控与日志管理 是关键。 试想一下:如果没有监控…...
实战|使用环信Flutter SDK构建鸿蒙HarmonyOS应用及推送配置
本文为大家介绍如何在 Flutter 环境创建 Harmony 项目并集成环信即时通讯IM以及环信 Flutter Harmony 推送配置。 已经基于环信的 Flutter 项目也可以参考本文适配鸿蒙端。 一、开发环境要求 前置条件 1.安装DevEco-Studio 2.安装模拟器 DevEco-Studio 下载与操作指导&…...
构建知识体系
我认为,仅仅建立知识点之间的连接还不足够,还要建立自己的知识体系。 那么什么是知识体系呢? 知识体系,可以理解为立体的知识系统。 立体的知识系统,代表着跨越了多个领域、行业、学科的知识,是多个层面…...
Android Mainline简介
关键要点 Android Mainline 是通过模块化更新 Android 核心组件的框架,可能提高安全性。允许通过 Google Play 系统更新分发模块,无需完整固件更新。能简化厂商工作并减少碎片化,但覆盖范围有限。 什么是 Android Mainline? And…...
2026《数据结构》考研复习笔记二(C++面向对象)
C面向对象 一、类二、继承三、重载运算符和重载函数四、多态代码示例 一、类 1.1类&对象 class classname//class是关键词,classname是类名 { Access specifiers://访问修饰符:private/public/protected Date members/variables;//变量 Member fun…...
【C++】12.list接口介绍
在C标准库中,std::list 是一个基于双向链表实现的顺序容器,它支持高效的插入和删除操作,但无法直接通过下标进行随机访问。以下是关于 std::list 的简单介绍: 核心特性 底层结构 双向链表实现,每个节点包含数据、前驱指…...
决策卫生问题:考公考编考研能补救高考选取职业的错误吗
对于决策者来说,“认识你自己”是一个永恒的主题;警惕认知中的缺陷,比什么都重要。在判断与决策问题上,管理者和专业人士往往都非常自信。人类远远不如我们想象的那么理性,人类的判断也远远不如我们想象的那么完美。在…...
考研系列-计算机网络-第一章、计算机网络体系结构
一、计算机网络概述 1.知识点总结 性能指标: 注意这个指标: 2.习题总结 (一)选择题 广域网点对点,局域网广播技术 (二)简答题 (1)概念性题目: (2)计算型题目 这个题目主要是注意两种交换方式: 电路交换:…...
状态模式:有限状态机在电商订单系统中的设计与实现
状态模式:有限状态机在电商订单系统中的设计与实现 一、模式核心:用状态切换驱动行为变化 在电商订单系统中,订单状态会随着用户操作动态变化:「已创建」的订单支付后变为「已支付」,发货后变为「已发货」࿰…...
nohup命令使用说明
文章目录 如何在后台运行程序呢?如何正常运行代码重定向呢?nohup: ignoring input 如何在后台运行程序呢? 使用nohup命令即可, nohup python dataset/ReferESpatialDataset.py >>dataset_20250417.log 2>&1 &n…...
使用原生button封装一个通用按钮组件
效果图 代码 <script lang"ts" setup> import { computed, ref, watch } from "vue";/*** 按钮属性接口*/ interface ButtonProps {/** 按钮类型:default(默认)/dark/plain/link */type?: "default" | "dark" | &q…...
osu ai 论文笔记 DQN
e https://theses.liacs.nl/pdf/2019-2020-SteeJvander.pdf Creating an AI for the Rhytm Game osu! 20年的论文 用监督学习训练移动模型100首歌能达到95准确率 点击模型用DQN两千首歌65准确率 V抖用的居然不是强化学习? 5,6星打96准确度还是有的东西的 这是5.…...
perf 的使用方法
perf的架构 1.perf event event are pure kernel counters, in this case they are called software events. Examples include: context-switches, minor-faults.events is the processor itself and its Performance Monitoring Unit (PMU). It provides a list of events …...
【MCP教程】Claude Desktop 如何连接部署在远程的remote mcp server服务器(remote host)
前言 最近MCP特别火热,笔者自己也根据官方文档尝试了下。 官方文档给的Demo是在本地部署一个weather.py,然后用本地的Claude Desktop去访问该mcp服务器,从而完成工具的调用: 但是,问题来了,Claude Deskto…...
使用python帮助艺术家完成角色动画和服装模型等任务
使用python帮助艺术家完成角色动画和服装模型等任务 声明:克隆项目第 1 步:准备 Python 环境第 2 步:安装依赖✅ 第 3 步:运行项目主入口报错:报错:**降级 Python 到 3.10 或 3.11**推荐版本: 创…...
Python爬虫实战:基于 Python Scrapy 框架的百度指数数据爬取研究
一、引言 1.1 研究背景 在当今信息时代,市场调研和趋势分析对于企业和研究机构至关重要。百度指数能够精准反映关键词在百度搜索引擎上的热度变化情况,为市场需求洞察、消费者兴趣分析等提供了极具价值的数据支持。通过对百度指数数据的爬取和分析,企业可以及时调整营销策略…...
【Python】python系列之函数闭包概念
目录 一、函数 二、闭包 2.1 概念 2.2闭包的应用场景 2.3代码实例 实例 1:简单计数器闭包 实例 2:带参数的闭包 实例 3:闭包用于数据封装和隐藏 一、函数 函数是实现特定功能的代码段的封装,在需要时可以多次调用函数来实…...
【React】什么是 Hook
useStateuseEffectuseRef 什么是hook?16.8版本出现的新特性。可以在不编写class组件的情况下使用state以及其它的React特性 为什么有hook?class组件很难提取公共的重用的代码,然后反复使用;不编写类组件也可以使用类组件的状态st…...
香港科技大学广州|智能交通学域博士招生宣讲会—北京理工大学专场
香港科技大学广州|智能交通学域博士招生宣讲会—北京理工大学专场 🕙时间:4月23日(星期三)16:00 🏠地点:北京理工大学中关村校区唯实报告厅 🔗报名链接:https://www.wj…...
食品计算—Coarse-to-fine nutrition prediction
🌟🌟 欢迎来到我的技术小筑,一个专为技术探索者打造的交流空间。在这里,我们不仅分享代码的智慧,还探讨技术的深度与广度。无论您是资深开发者还是技术新手,这里都有一片属于您的天空。让我们在知识的海洋中…...
日语学习-日语知识点小记-构建基础-JLPT-N4阶段(6):ながら 一边。。一边
日语学习-日语知识点小记-构建基础-JLPT-N4阶段(6):ながら 一边。。一边 1、前言(1)情况说明(2)工程师的信仰 2、知识点(1)ながら1)一边。。一边2࿰…...
Electricity Market Optimization(VI) - 机组组合模型以及 Gurobi 求解
本文参考链接:link \hspace{1.6em} 机组组合问题在电力系统中非常重要,这个问题也是一个优化问题,研究的就是如何调度现有的机组,调度的对象是以煤炭、石油、天然气为燃料的火力发电机以及水力发电机等可预测处理的发电机组&#…...
LoRA个关键超参数:`LoRA_rank`(通常简称为 `rank` 或 `r`)和 `LoRA_alpha`(通常简称为 `alpha`)
LoRA (Low-Rank Adaptation) 中的两个关键超参数:LoRA_rank(通常简称为 rank 或 r)和 LoRA_alpha(通常简称为 alpha)。 LoRA 的核心思想是,在对大型预训练模型(如 LLM 或 Stable Diffusion&…...
Sql刷题日志(day3)
一、笔试 1、min(date_time):求最早日期 2、mysql中distinct不能与order by 连用,可以用group by去重 二、面试 1、SQL中如何利用replace函数统计给定重复字段在字符串中的出现次数 (length(all_string)-length(all_string,目标字符串,))/length(ta…...
【AI插件开发】Notepad++ AI插件开发实践:实现对话窗口功能
引言 之前的文章已经介绍实现了AI对话窗口,但只有个空壳,没有实现功能。本次将集中完成对话窗口的功能,主要内容为: 模型动态切换:支持运行时加载配置的AI模型列表交互式输入处理:实现多行文本输入与Ctrl…...
[GESP202409 二级] 小杨的 N 字矩阵 题解
#include<bits/stdc.h> #define int long long using namespace std; int m, a[55][55], sum; signed main(){cin >> m;for(int i 1; i < m; i ){a[i][1] 1;//第一列a[i][m] 1;//第m列sum ;a[i][sum] 1;//斜着的}for(int i 1; i < m; i ){for(int j 1;…...
第八章:探索新兴趋势:Agent 框架、产品与开源力量
引言 在前两章的实战中,我们已经掌握了如何使用 LangChain、LlamaIndex、AutoGen 和 CrewAI 这些主流框架来构建 AI Agent,无论是单个智能体还是协作的多 Agent 系统。然而,AI Agent 领域的发展日新月异,如同奔腾的河流ÿ…...
条款05:了解C++默默编写并调用哪些函数
目录 1.默认生成的函数 2.无法生成的情况 2.1当成员函数有引用 或者 被const修饰 2.2.operator在基类被私有 1.默认生成的函数 class empty {};//相当于class empty { public:empty(){ ... } // 构造函数empty(const empty& rhs) { ... }// 拷贝构造~empty(){ ... } //…...
Vue3 中封装函数实现加载图片加载失败兜底方案。
文章目录 Vue3 中使用动态加载图片并处理加载失败的情况实现思路代码实现代码解析注意事项扩展功能总结 Vue3 中使用动态加载图片并处理加载失败的情况 在开发 Vue3 应用时,我们经常会遇到需要动态加载图片的场景。例如,图片资源可能从后端获取…...
微机控制电液伺服汽车减震器动态试验系统
微机控制电液伺服汽车减震器动态试验系统,用于对汽车筒式减震器、减震器台架、驾驶室减震装置、发动机悬置软垫总成、发动机前置楔形支撑总成等的示功图试验、速度特性试验。 主要的技术参数: 1、最大试验力:5kN; 2、试验力测量精…...
如何简单几步使用 FFmpeg 将任何音频转为 MP3?
在多媒体处理领域,FFmpeg 以其强大的功能和灵活性而闻名。无论是视频编辑、音频转换还是流媒体处理,它都是专业人士和技术爱好者的首选工具之一。在这篇文章中简鹿办公将重点介绍如何使用 FFmpeg 进行音频格式转换,提供一些常用的转换方式&am…...
【软考-系统架构设计师】ATAM方法及效用树
软件架构设计中ATAM方法及效用树深度解析 一、ATAM方法核心框架与流程 ATAM(架构权衡分析方法)是由卡耐基梅隆大学提出的系统性架构评估方法,旨在通过多维度质量属性分析识别架构风险、敏感点与权衡点。其实施流程分为四阶段九步骤…...
2025第十七届“华中杯”大学生数学建模挑战赛题目B 题 校园共享单车的调度与维护问题完整成品正文33页(不含附录)文章思路 模型 代码 结果分享
校园共享单车运营优化与调度模型研究 摘 要 本研究聚焦校园共享单车点位布局、供需平衡、运营效率及故障车辆回收四大核心问题,通过构建一系列数学模型,系统分析与优化共享单车的运维体系。 针对问题一,我们建立了基于多时段观测的库存估算…...
React Native 0.79 稳定版发布,更快的工具、更多改进
React Native 0.79 已发布。此版本在多个方面进行了性能改进,并修复了一些漏洞。首先,得益于延迟哈希技术,Metro 的启动速度变快了,并且对包导出提供了稳定支持。由于 JS 包压缩方式的改变等原因,Android 的启动时间也…...
中国AI应用革命开启新纪元:从DeepSeek燎原到全栈生态崛起
当生成式AI的星火点燃华夏大地,一场由DeepSeek引发的智能革命正在重构中国产业版图。在这场算力与智慧的角逐中,全产业链的协同创新正在书写中国式AI进化的新范式。 一、全栈突围:AI基础设施生态全面升维 云端启航:头部云服务商…...
生物系统中的随机性及AI拓展
生物系统远非确定性的机器,而是本质上充满噪声的。这种随机性,或称偶然性,在塑造细胞行为和结果方面起着至关重要的作用。从基因表达到细胞命运决定,波动和不可预测的事件可以显著影响生物过程。理解和建模这种固有的变异性对于全…...
智能交响:EtherCAT转Profinet网关开启汽车自动化通信新纪元
在汽车制造行业,随着自动化程度的不断提升,设备之间的高效通信显得尤为重要。以吉利汽车西安制造基地为例,生产线中广泛应用了西门子PLC与机器人手臂等设备,这些设备分别采用了Profinet和EtherCAT通信协议。为实现不同协议设备之间…...