当前位置: 首页 > news >正文

RabbitMQ ④-持久化 || 死信队列 || 延迟队列 || 事务

在这里插入图片描述

消息确认机制

简单介绍

RabbitMQ Broker 发送消息给消费者后,消费者处理该消息时可能会发生异常,导致消费失败。

如果 Broker 在发送消息后就直接删了,就会导致消息的丢失。

为了保证消息可靠到达消费者并且成功处理了该消息,RabbitMQ 提供了消息确认机制。

消费者在订阅队列时,可以指定 autoAck 参数,该参数指定是否自动确认消息。

  • autoAck=true:消费者接收到消息后,自动确认消息,RabbitMQ Broker 立即删除该消息。
  • autoAck=false:消费者接收到消息后,不自动确认消息,需要消费者调用 channel.basicAck() 方法确认消息。如果消费者处理消息时发生异常,则可以调用 channel.basicNack() 方法,表示不确认该消息的接收。

Spring AMQP 提供了三种模式的消息确认

  • AcknowledgeMode.NONE:消息一经发送,就不管它了,不管消费者是否处理成功,都直接确认消息。
  • AcknowledgeMode.AUTO(默认):自动确认,消息接收后,消费者处理成功时自动确认该消息,如果处理时发送异常,则不会确认消息。
  • AcknowledgeMode.MANUAL:手动确认,消息接收后,消费者处理成功时,需要调用 channel.basicAck() 方法确认消息,如果处理时发送异常,则需要调用 channel.basicNack() 方法,表示不确认该消息的接收。

代码示例

spring:application:name: rabbit-extensions-demorabbitmq:addresses: amqp://admin:admin@47.94.9.33:5672/extensionlistener:simple:acknowledge-mode: manual
package com.ljh.extensions.rabbit.config;import com.ljh.extensions.rabbit.constants.Constants;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {@Bean("ackQueue")public Queue ackQueue() {return QueueBuilder.durable(Constants.ACK_QUEUE).build();}@Bean("ackExchange")public DirectExchange ackExchange() {return ExchangeBuilder.directExchange(Constants.ACK_EXCHANGE).durable(true).build();}
//    @Bean("binding")
//    public Binding binding(Exchange exchange, Queue queue) {
//        return BindingBuilder.bind(queue)
//                .to(exchange)
//                .with("ack")
//                .noargs();
//    }@Bean("binding1")public Binding binding1(@Qualifier("ackExchange") DirectExchange exchange, @Qualifier("ackQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("ack");}
}
package com.ljh.extensions.rabbit.controller;import com.ljh.extensions.rabbit.constants.Constants;
import jakarta.annotation.Resource;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.nio.charset.StandardCharsets;@RequestMapping("/producer")
@RestController
public class ProducerController {@Resource(name = "rabbitTemplate")RabbitTemplate rabbitTemplate;@RequestMapping("/ack")public String ack() {rabbitTemplate.convertAndSend(Constants.ACK_EXCHANGE, "ack", "消费者消息确认喵~");return "发送成功";}
}
package com.ljh.extensions.rabbit.listener;import com.ljh.extensions.rabbit.constants.Constants;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class AckListener {@RabbitListener(queues = Constants.ACK_QUEUE)public void process(Message message, Channel channel) throws Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();System.out.printf("接收到消息:%s,deliveryTag:%d\n",new String(message.getBody(), "UTF-8"),deliveryTag);try {System.out.println("模拟处理业务逻辑");int a = 3 / 0;System.out.println("模拟处理业务完成");channel.basicAck(deliveryTag, false);} catch (Exception e) {channel.basicNack(deliveryTag, false, true);}}
}

持久性机制

简单介绍

前面讲了消费端处理消息时,消息如何不丢失,但是如何保证 RabbitMQ 服务停掉以后,生产者发送的消息不丢失呢。默认情况下, RabbitMQ 退出或者由于某种原因崩溃时,会忽视队列和消息。

为了保证消息持久化,RabbitMQ 提供了持久化机制,分别是:交换机持久化、队列持久化和消息持久化。

  • 交换机持久化:使用 ExchangeBuilder.durable(true) 方法创建的交换机,RabbitMQ 会将交换机信息持久化到磁盘,重启 RabbitMQ 后可以自动恢复。
  • 队列持久化:使用 QueueBuilder.durable(true) 方法创建的队列,RabbitMQ 会将队列信息持久化到磁盘,重启 RabbitMQ 后可以自动恢复。
  • 消息持久化:消息持久化可以保证消息不丢失,即使 RabbitMQ 重启或者崩溃,消息也不会丢失。

将所有的消息都设置为持久化,会严重影响 RabbitMQ 的性能,这是因为写入磁盘的速度相比于写入内存的速度还是很慢的,对于可靠性不是那么高的消息,可以不采用持久化处理以提高整体的吞吐量。

在选择是否要将消息持久化时,需要在可靠性和吞吐量之间做一个权衡。

尽管设置了持久化,也不能保证就一定可以持久化。这是因为在将这些持久化信息写入磁盘时也是需要时间的,如果 RabbitMQ 在这段时间内崩溃,那么这些信息也会丢失。

代码示例

package com.ljh.extensions.rabbit.config;import com.ljh.extensions.rabbit.constants.Constants;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {@Bean("persistQueue")public Queue persistQueue() {return QueueBuilder.nonDurable(Constants.PERSIST_QUEUE).build();}@Bean("persistExchange")public DirectExchange persistExchange() {return ExchangeBuilder.directExchange(Constants.PERSIST_EXCHANGE).durable(false).build();}@Bean("binding2")public Binding binding2(@Qualifier("persistExchange") Exchange exchange, @Qualifier("persistQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("persist").noargs();}
}
package com.ljh.extensions.rabbit.controller;import com.ljh.extensions.rabbit.constants.Constants;
import jakarta.annotation.Resource;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.nio.charset.StandardCharsets;@RequestMapping("/producer")
@RestController
public class ProducerController {@Resource(name = "rabbitTemplate")RabbitTemplate rabbitTemplate;@RequestMapping("/persist")public String persist() {Message message = new Message("消费者消息确认喵~".getBytes(StandardCharsets.UTF_8), new MessageProperties());message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);rabbitTemplate.convertAndSend(Constants.PERSIST_EXCHANGE, "persist", message);return "发送成功";}
}

发送方确认机制

简单介绍

在发送方将消息发送至 RabbitMQ Broker 时,也有可能出现消息丢失的情况。

为了保证消息可靠到达 Broker,RabbitMQ 提供了发送方确认机制。

发送方确认机制是指,在消息发送到 Broker 后,发送方会等待 Broker 回应,如果发送方收到消息,则发送方认为消息发送成功,如果发送方未收到消息,则发送方认为消息发送失败,可以重新发送。

RabbitMQ 提供了两种方式保证发送方发送的消息的可靠传输

  • confirm 确认模式:发送方在发送消息后,对发送方设置一个 ConfirmCallback 的监听,无论消息是否抵达 Exchange,这个监听都会被执行,如果消息抵达了 Exchange,则 ACKtrue,如果消息没有抵达 Exchange,则 ACKfalse
  • returns 退回模式:尽管确认消息发送至 Exchange 后,也依然不能完全保证消息的可靠传输。在 ExchangeQueue 会有一个 Routing Key(Binding Key) 的绑定关系,如果消息没有匹配到任何一个 Queue,则通过 returns 模式则会退回到发送方。

代码示例

confirm 确认模式

spring:application:name: rabbit-extensions-demorabbitmq:addresses: amqp://admin:admin@47.94.9.33:5672/extensionlistener:simple:acknowledge-mode: auto# 消息发送确认机制publisher-confirm-type: correlated
package com.ljh.extensions.rabbit.config;import com.ljh.extensions.rabbit.constants.Constants;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {@Bean("confirmQueue")public Queue confirmQueue() {return QueueBuilder.durable(Constants.CONFIRM_QUEUE).build();}@Bean("confirmExchange")public DirectExchange confirmExchange() {return ExchangeBuilder.directExchange(Constants.CONFIRM_EXCHANGE).durable(true).build();}@Bean("binding3")public Binding binding3(@Qualifier("confirmExchange") DirectExchange directExchange, @Qualifier("confirmQueue") Queue queue) {return BindingBuilder.bind(queue).to(directExchange).with("confirm");}
}
package com.ljh.extensions.rabbit.config;import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @author: Themberfue* @date: 2025/4/30 21:08* @description:*/
@Configuration
public class RabbitTemplateConfig {@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {return new RabbitTemplate(connectionFactory);}@Beanpublic RabbitTemplate confirmRabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);// ? 设置确认消息机制rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.println("执行了confirm方法");if (ack) {System.out.printf("接收到消息,消息ID:%s\n",correlationData == null ? null : correlationData.getId());} else {System.out.printf("未接收到消息,消息ID:%s;原因:%s\n",correlationData == null ? null : correlationData.getId(), cause);}}});return rabbitTemplate;}
}
package com.ljh.extensions.rabbit.controller;import com.ljh.extensions.rabbit.constants.Constants;
import jakarta.annotation.Resource;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.nio.charset.StandardCharsets;@RequestMapping("/producer")
@RestController
public class ProducerController {@Resource(name = "confirmRabbitTemplate")RabbitTemplate confirmRabbitTemplate;@RequestMapping("/confirm")public String confirm() {// ! 直接使用 setConfirmCallback 会影响其他接口的调用// ! 且只能设置一个确认回调,多次发起请求会报错
//        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
//            @Override
//            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
//                System.out.println("执行了confirm方法");
//                if (ack) {
//                    System.out.printf("接收到消息,消息ID:%s\n",
//                            correlationData == null ? null : correlationData.getId());
//                } else {
//                    System.out.printf("未接收到消息,消息ID:%s\n;原因:%s",
//                            correlationData == null ? null : correlationData.getId(), cause);
//                }
//            }
//        });CorrelationData correlationData = new CorrelationData("1");confirmRabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE + "1", "confirm", "confirm test...", correlationData);return "消息发送成功";}
}

returns 退回模式

spring:application:name: rabbit-extensions-demorabbitmq:addresses: amqp://admin:admin@47.94.9.33:5672/extensionlistener:simple:acknowledge-mode: auto# 消息发送退回机制publisher-returns: true
package com.ljh.extensions.rabbit.config;import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @author: Themberfue* @date: 2025/4/30 21:08* @description:*/
@Configuration
public class RabbitTemplateConfig {@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {return new RabbitTemplate(connectionFactory);}@Beanpublic RabbitTemplate confirmRabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);// ? 设置消息退回机制rabbitTemplate.setMandatory(true);rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returned) {System.out.println("消息被退回:" + returned);}});return rabbitTemplate;}
}

总结:如何确保消息的可靠性传输

  • 发送方 => 服务端:通过发送方确认机制confirm 确认模式returns 退回模式,确保消息可靠到达。
  • 服务端:通过持久化机制,保证消息不丢失。
  • 服务端 => 接收方:通过消息确认机制,确保消息被消费者正确消费。

重试机制

简单介绍

消息在处理失败后,重新发送,重新处理,这便是消息重试机制。

RabbitMQ 提供了消息重试机制,可以设置消息最大重试次数,超过最大重试次数还未成功消费,则消息会被丢弃。

代码示例

spring:application:name: rabbit-extensions-demorabbitmq:addresses: amqp://admin:admin@47.94.9.33:5672/extensionlistener:simple:
#         消息接收确认机制# acknowledge-mode: manual # 手动确认时,重发机制无效acknowledge-mode: autoretry:enabled: true # 开启重试机制initial-interval: 5000ms # 重发时间间隔max-attempts: 5 # 最大重试次数
package com.ljh.extensions.rabbit.config;import com.ljh.extensions.rabbit.constants.Constants;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {@Bean("retryQueue")public Queue retryQueue() {return QueueBuilder.durable(Constants.RETRY_QUEUE).build();}@Bean("retryExchange")public DirectExchange retryExchange() {return ExchangeBuilder.directExchange(Constants.RETRY_EXCHANGE).durable(true).build();}@Bean("binding4")public Binding binding4(@Qualifier("retryExchange") DirectExchange exchange, @Qualifier("retryQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("retry");}
}
package com.ljh.extensions.rabbit.controller;import com.ljh.extensions.rabbit.constants.Constants;
import jakarta.annotation.Resource;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.nio.charset.StandardCharsets;@RequestMapping("/producer")
@RestController
public class ProducerController {@Resource(name = "rabbitTemplate")RabbitTemplate rabbitTemplate;@RequestMapping("/retry")public String retry() {rabbitTemplate.convertAndSend(Constants.RETRY_EXCHANGE, "retry", "retry test...");return "消息发送成功";}
}
package com.ljh.extensions.rabbit.listener;import com.ljh.extensions.rabbit.constants.Constants;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.io.UnsupportedEncodingException;@Component
public class RetryListener {@RabbitListener(queues = Constants.RETRY_QUEUE)public void process(Message message) throws UnsupportedEncodingException {long deliveryTag = message.getMessageProperties().getDeliveryTag();System.out.printf("[%s]收到消息:%s,deliveryTag:%d\n",Constants.RETRY_QUEUE, new String(message.getBody(), "UTF-8"), deliveryTag);int num = 3 / 0;System.out.println("业务处理完成");}
}

TTL 机制

简单介绍

TTL(Time To Live)机制,可以设置消息的存活时间,超过存活时间还未消费,则消息会被丢弃。

RabbitMQ 提供了 TTL 机制,可以设置队列和消息的 TTL 值,超过 TTL 值还未消费,则消息会被丢弃。

两者区别:

  • 设置队列 TTL 值,一旦消息过期,就会从队列中删除。设置队列过期时间,队列中已过期的消息肯定在队列头部,RabbitMQ 只要定期扫描对头的消息是否过期即可。
  • 设置消息 TTL 值,即使消息过期,也不会马上删除,只有在发送至消费者时才会检测其是否已经过期,如果过期才会删除。设置消息过期时间,每个消息的过期时间都可能不尽相同,所以需要扫描整个队列的消息才可确定是否过期,为了确保性能,所以采取类似于 懒汉模式 的方式。

将队列 TTL 设置为 30s,第一个消息的 TTL 设置为 30s,第二个消息的 TTL 设置为 10s。

理论上说,在 10s 后,第二个消息应该被丢弃。但由于设置了队列 TTL 值的机制,只会扫描队头的消息是否过期,所以在第一个消息过期之前,第二个消息不会被删除。

代码示例

package com.ljh.extensions.rabbit.config;import com.ljh.extensions.rabbit.constants.Constants;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {@Bean("ttlQueue")public Queue ttlQueue() {return QueueBuilder.durable(Constants.TTL_QUEUE).ttl(20_000) // ? 设置队列的 TTL 值.build();}@Bean("ttlExchange")public DirectExchange ttlExchange() {return ExchangeBuilder.directExchange(Constants.TTL_EXCHANGE).durable(true).build();}@Bean("binding5")public Binding binding5(@Qualifier("ttlExchange") DirectExchange exchange, @Qualifier("ttlQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("ttl");}
}
package com.ljh.extensions.rabbit.controller;import com.ljh.extensions.rabbit.constants.Constants;
import jakarta.annotation.Resource;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.nio.charset.StandardCharsets;@RequestMapping("/producer")
@RestController
public class ProducerController {@Resource(name = "rabbitTemplate")RabbitTemplate rabbitTemplate;@RequestMapping("/ttl")public String ttl() {MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {// ? 设置消息的 TTL 值message.getMessageProperties().setExpiration("10000");return message;}};rabbitTemplate.convertAndSend(Constants.TTL_EXCHANGE, "ttl", "ttl test...",messagePostProcessor);return "消息发送成功";}
}

死信队列

简单介绍

死信(Dead Letter),就是因为某种原因,导致消费者消费失败的消息,称之为死信。

死信队列,当消息在一个队列中变成死信时,它就被重新被发送到另一个交换机,该交换机就是死信交换机(Dead Letter Exchange)。

该死信交换机绑定死信队列,当消息被重新发送到死信交换机时,它就被重新投递到死信队列。

消息变成死信会有如下几种原因:

  • 消息被拒绝(basic.reject 或 basic.nack)并且 requeue 参数设置为 false。
  • 消息过期。
  • 队列达到最大长度。

代码示例

package com.ljh.extensions.rabbit.config;import com.ljh.extensions.rabbit.constants.Constants;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class DlConfig {@Bean("normalQueue")public Queue normalQueue() {return QueueBuilder.durable(Constants.NORMAL_QUEUE).deadLetterExchange(Constants.DL_EXCHANGE) // ? 配置该队列的死信交换机.deadLetterRoutingKey("dl") // ? 死信交换机绑定死信队列的 Routing Key.ttl(10_000).maxLength(10L) // ? 设置队列最大长度.build();}@Bean("normalExchange")public DirectExchange normalExchange() {return ExchangeBuilder.directExchange(Constants.NORMAL_EXCHANGE).durable(true).build();}@Bean("normalBinding")public Binding normalBinding(@Qualifier("normalExchange") DirectExchange exchange, @Qualifier("normalQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("normal");}@Bean("dlQueue")public Queue dlQueue() {return QueueBuilder.durable(Constants.DL_QUEUE).build();}@Bean("dlExchange")public DirectExchange dlExchange() {return ExchangeBuilder.directExchange(Constants.DL_EXCHANGE).durable(true).build();}@Bean("dlBinding")public Binding dlBinding(@Qualifier("dlExchange") DirectExchange exchange, @Qualifier("dlQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("dl");}
}
package com.ljh.extensions.rabbit.listener;import com.ljh.extensions.rabbit.constants.Constants;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.util.Date;@Component
public class DlListener {@RabbitListener(queues = Constants.NORMAL_QUEUE)public void processNormal(Message message, Channel channel) throws Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();System.out.printf("[%s]收到消息:%s,deliveryTag:%d\n",Constants.NORMAL_QUEUE, new String(message.getBody(), "UTF-8"), deliveryTag);try {int num = 3 / 0;channel.basicAck(deliveryTag, false);} catch (Exception e) {channel.basicNack(deliveryTag, false, false);}}@RabbitListener(queues = Constants.DL_QUEUE)public void processDl(Message message) throws Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();System.out.printf("%s:[%s]收到消息:%s,deliveryTag:%d\n",new Date(), Constants.DL_QUEUE, new String(message.getBody(), "UTF-8"), deliveryTag);}
}
package com.ljh.extensions.rabbit.controller;import com.ljh.extensions.rabbit.constants.Constants;
import jakarta.annotation.Resource;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.nio.charset.StandardCharsets;
import java.util.Date;@RequestMapping("/producer")
@RestController
public class ProducerController {@Resource(name = "rabbitTemplate")RabbitTemplate rabbitTemplate;@RequestMapping("/dl")public String dl() {rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE, "normal", "normal test...");return "消息发送成功:" + new Date();}
}

延迟队列

简单介绍

延迟队列(Delay Queue),即消息被发送以后,并不想让消费者立刻消费该消息,而是等待一段时间后再消费。

延迟队列的使用场景有很多,比如:

  • 智能家居:智能设备产生的事件,如开关、温度变化等,可以先存放在延迟队列中,等待一段时间后再消费。
  • 日常管理:预定会议,需要在会议开始前 15 分钟通知参会人员。
  • 订单处理:订单创建后,需要 30 分钟后才会发货。

RabbitMQ 本身没有提供延迟队列的功能,但是基于消息过期后会变成死信的特性,可以通过设置 TTL 和死信队列来实现延迟队列的功能。

代码示例

@RequestMapping("/delay")
public String delay() {//发送带ttl的消息rabbitTemplate.convertAndSend(Constant.NORMAL_EXCHANGE_NAME, "normal", "ttl test 10s..."+ new Date(), messagePostProcessor -> {messagePostProcessor.getMessageProperties().setExpiration("10000");//10s过期return messagePostProcessor;});rabbitTemplate.convertAndSend(Constant.NORMAL_EXCHANGE_NAME, "normal", "ttl test 20s..."+ new Date(), messagePostProcessor -> {messagePostProcessor.getMessageProperties().setExpiration("20000");//20s过期return messagePostProcessor;});return "发送成功!";
}

由于 RabbitMQ 检查消息是否过期的机制,如果 20s 的消息先到队列,那么 10s 的消息只会在 20s 后才会被检查到过期。

延迟队列插件

RabbitMQ 官方提供了延迟队列插件,可以实现延迟队列的功能。

延迟队列插件

安装队列插件

延迟队列插件下载地址

下载插件后,需要将插件放到 RabbitMQ 的插件目录(/usr/lib/rabbitmq/plugins)下,然后重启 RabbitMQ 服务。

代码示例

package com.ljh.extensions.rabbit.config;import com.ljh.extensions.rabbit.constants.Constants;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class DelayConfig {@Bean("delayQueue")public Queue delayQueue() {return QueueBuilder.durable(Constants.DELAY_QUEUE).build();}@Bean("delayExchange")public DirectExchange delayExchange() {return ExchangeBuilder.directExchange(Constants.DELAY_EXCHANGE).durable(true).delayed() // ? 设置队列为延迟队列.build();}@Bean("delayBinding")public Binding delayBinding(@Qualifier("delayExchange") DirectExchange exchange, @Qualifier("delayQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("delay");}
}
package com.ljh.extensions.rabbit.listener;import com.ljh.extensions.rabbit.constants.Constants;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.util.Date;@Component
public class DelayListener {@RabbitListener(queues = Constants.DELAY_QUEUE)public void processDelay(Message message) throws Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();System.out.printf("%s:[%s]收到消息:%s,deliveryTag:%d\n",new Date(), Constants.DELAY_QUEUE, new String(message.getBody(), "UTF-8"), deliveryTag);}
}
package com.ljh.extensions.rabbit.controller;import com.ljh.extensions.rabbit.constants.Constants;
import jakarta.annotation.Resource;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.nio.charset.StandardCharsets;
import java.util.Date;@RequestMapping("/producer")
@RestController
public class ProducerController {@Resource(name = "rabbitTemplate")RabbitTemplate rabbitTemplate;@RequestMapping("/delay")public String delay() {rabbitTemplate.convertAndSend(Constants.DELAY_EXCHANGE, "delay", "delay test...", message -> {message.getMessageProperties().setDelayLong(20_000L); // ? 设置消息的延迟发送时间return message;});rabbitTemplate.convertAndSend(Constants.DELAY_EXCHANGE, "delay", "delay test...", message -> {message.getMessageProperties().setDelayLong(10_000L); // ? 设置消息的延迟发送时间return message;});return "消息发送成功:" + new Date();}
}

事务机制

简单介绍

RabbitMQ 是基于 AMQP 协议实现的,该协议实现了事务机制,因此RabbitMQ也支持事务机制事务。

Spring AMQP 也提供了对事务相关的操作。RabbitMQ 事务允许开发者确保消息的发送和接收是原子性的,要么全部成功,要么全部失败。

代码示例

配置事务管理器:

@Bean("transRabbitTemplate")
public RabbitTemplate transRabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setChannelTransacted(true);return rabbitTemplate;
}@Bean
public RabbitTransactionManager rabbitTransactionManager (ConnectionFactory connectionFactory) {return new RabbitTransactionManager(connectionFactory);
}
@Bean("transQueue")
public Queue transQueue() {return QueueBuilder.durable(Constants.TRANS_QUEUE).build();
}
@Transactional
@RequestMapping("/trans")
public String trans() {String msg = "trans test...";System.out.println("发送第一条消息:" + msg + 1);transRabbitTemplate.convertAndSend("", Constants.TRANS_QUEUE, msg);int a = 3 / 0;System.out.println("发送第二条消息:" + msg + 2);transRabbitTemplate.convertAndSend("", Constants.TRANS_QUEUE, msg);return "消息发送成功:";
}

消息分发

简单介绍

一个队列可以给多个消费者消费,默认情况下,RabbitMQ 是以轮询的方式将消息分发给这些消费者,不管该消费是否已经消费并且确认。

这种情况是不太合理的,如果每个消费者消费的能力都不同,有的消费者消费快,有的慢,这会极大降低整体系统的吞吐量和处理速度。

我们可以使用 channel.basicQos(int prefetchCount) 来限制当前信
道上的消费者所能保持的最大未确认消息的数量。

当该消费者达到最大的 prefetchCount 限制时,RabbitMQ 会停止向该消费者分发消息,直到该消费者的未确认消息数量小于 prefetchCount 时。

代码示例

spring:rabbitmq:listener:simple:acknowledge-mode: manual # 手动确认prefetch: 5 # 队列最大接收五条消息
@Bean("qosQueue")
public Queue qosQueue() {return QueueBuilder.durable(Constants.QOS_QUEUE).build();
}
@Bean("qosExchange")
public DirectExchange qosExchange() {return ExchangeBuilder.directExchange(Constants.QOS_EXCHANGE).durable(true).build();
}
@Bean("qosBinding")
public Binding qosBinding(@Qualifier("qosExchange") DirectExchange exchange, @Qualifier("qosQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("qos");
}
@RequestMapping("/qos")
public String qos() {for (int i = 0; i < 20; i++) {rabbitTemplate.convertAndSend(Constants.QOS_EXCHANGE, "qos", "qos test...");}return "消息发送成功:" + new Date();
}
@RabbitListener(queues = Constants.QOS_QUEUE)
public void process(Message message, Channel channel) throws Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();System.out.printf("接收到消息:%s,deliveryTag:%d\n",new String(message.getBody(), "UTF-8"),deliveryTag);try {System.out.println("模拟处理业务逻辑");System.out.println("模拟处理业务完成");//            channel.basicAck(deliveryTag, false);} catch (Exception e) {channel.basicNack(deliveryTag, false, true);}
}

应用场景

  • 限流:可以根据消费者的处理能力,设置 prefetchCount 限制每个消费者所能接收的消息数量,从而达到限流的目的。
  • 负载均衡:通过将 prefetchCount 设置为 1,通过设 prefetch = 1 的方式,告诉 RabbitMQ 一次只给一个消费者一条消息,也就是说,在处理并确认前一条消息之前,不要向该消费者发送新消息。相反,它会将它分派给下一个不忙的消费者。

相关文章:

RabbitMQ ④-持久化 || 死信队列 || 延迟队列 || 事务

消息确认机制 简单介绍 RabbitMQ Broker 发送消息给消费者后&#xff0c;消费者处理该消息时可能会发生异常&#xff0c;导致消费失败。 如果 Broker 在发送消息后就直接删了&#xff0c;就会导致消息的丢失。 为了保证消息可靠到达消费者并且成功处理了该消息&#xff0c;…...

十一、Hive JOIN 连接查询

作者&#xff1a;IvanCodes 日期&#xff1a;2025年5月16日 专栏&#xff1a;Hive教程 在数据分析的江湖中&#xff0c;数据往往分散在不同的“门派”&#xff08;表&#xff09;之中。要洞察数据间的深层联系&#xff0c;就需要JOIN这把利器&#xff0c;将相关联的数据串联起来…...

启用rvzi可视化自己的机器人发现joint state publisher gui没有滑块

启用rvzi可视化自己的机器人发现joint state publisher gui没有滑块&#xff1f; 解决方法&#xff1a; 一&#xff1a;查看urdf中joint type定义是不是revolute。 二&#xff1a;查看urdf关节限制是不是正确&#xff0c;如果是0到0则不正确。 <joint name"joint_…...

Gitee DevOps:中国企业数字化转型的“本土化加速器“

在数字化浪潮席卷全球的当下&#xff0c;DevOps已经从技术热词转变为企业的核心生产力工具。根据IDC最新报告&#xff0c;到2025年中国DevOps市场规模将达到15亿美元&#xff0c;年复合增长率高达25%。在这一快速增长的市场中&#xff0c;一个显著趋势正在显现&#xff1a;越来…...

RKNN开发环境搭建(ubuntu22.04)

以下情况在RV1106G3的平台上验证正常。 1、conda安装 1&#xff09;conda --version//确认是否安装 2&#xff09;创建一个安装目录&#xff0c;进行下一步 3&#xff09;wget https://mirrors.tuna.tsinghua.edu.cn/anaconda/miniconda/Miniconda3-4.6.14-Linux-x…...

2025年上软考 考试时间+准考证打印全攻略

各位同学好呀&#xff01;我是你们的老朋友大老李~之前咱们聊过软考报名和机考模拟系统的使用技巧。今天要和大家分享一下&#xff0c;最近好多同学私信的问题&#xff1a;"老李&#xff01;准考证到底哪天能打印&#xff1f;""考试时间会不会有变动&#xff1f…...

vllm量化05—FP8 W8A8

本系列基于Qwen2.5-7B&#xff0c;学习如何使用vllm量化&#xff0c;并使用benchmark_serving.py、lm_eval 测试模型性能和评估模型准确度。 测试环境为&#xff1a; OS: centos 7 GPU: nvidia l40 driver: 550.54.15 CUDA: 12.3本文是该系列第5篇——FP8 W8A8 一、量化 fro…...

OpenCV 图像透视变换详解

在计算机视觉领域&#xff0c;图像的视角问题常常会影响后续的分析与处理。例如&#xff0c;从倾斜角度拍摄的文档、带有畸变的场景图像等&#xff0c;都需要通过特定的方法进行矫正。OpenCV 作为计算机视觉领域的重要库&#xff0c;提供了强大的图像透视变换功能&#xff0c;能…...

Vue主题色切换实现方案(CSS 变量 + 类名切换)

定义主题变量 // styles/themes.scss :root {--primary-color: #314099;--secondary-color: #1E3968;--text-color: #2c44ce; }[data-theme"红系主题"] {--primary-color: #d74146;--secondary-color: #c20707;--text-color: #db3b3b; }在组件中使用变量 <templ…...

MySQL 高可用

目录 一、概述MySQL高可用 1. 什么是高可用性&#xff08;High Availability, HA&#xff09;&#xff1f; 2. MySQL 高可用常见方案 3. 高可用核心机制 4. 选型建议 注意事项 二、案例环境组成 MySQL 主主复制 keepalived HAProxy 的高可用负载均衡架构 三、案例架构…...

C#学习教程(附电子书资料)

概述 C#&#xff08;读作"C Sharp"&#xff09;是一种由微软开发的现代编程语言&#xff0c;结合了C的高效性和Java的简洁性&#xff0c;专为.NET框架设计。以下是其核心特性和应用领域的详细介绍电子书资料&#xff1a;https://pan.quark.cn/s/6fe772420f95 一、语…...

MySQL性能优化

目录 一、索引优化 1、慢查询日志分析 2、EXPLAIN 执行计划分析 3、索引类型选择 4、索引使用原则 5、常见索引失效场景 二、SQL语句优化 1、避免低效操作符 2、减少数据扫描量 3、子查询优化 4、其他高频优化技巧 三、表设计优化 1、数据类型优化 四、架构设计优…...

Mendix 中的XPath 令牌(XPath Tokens)详解

在 Mendix 中&#xff0c;XPath 令牌&#xff08;XPath Tokens&#xff09; 是一种特殊的动态参数化查询技术&#xff0c;允许你在 XPath 表达式中使用变量或上下文相关的值&#xff0c;从而实现更灵活的查询逻辑。 1. 什么是 XPath 令牌&#xff1f; XPath 令牌是 Mendix 提…...

Feign异步模式丢失上下文问题

Feign异步模式丢失上下文问题 问题描述 当我们使用异步对我们代码进行操作优化时&#xff0c;代码中使用了RequestContextHolder去获取上下文的数据&#xff0c;当我们执行原来可以执行的业务时发现报了空指针异常或数据为空&#xff0c;这是为什么呢&#xff1f; 原理解释 …...

保姆教程-----安装MySQL全过程

1.电脑从未安装过mysql的&#xff0c;先找到mysql官网&#xff1a;MySQL :: Download MySQL Community Server 然后下载完成后&#xff0c;找到文件&#xff0c;然后双击打开 2. 选择安装的产品和功能 依次点开“MySQL Servers”、“MySQL Servers”、“MySQL Servers 5.7”、…...

BUFDS_GTE2,IBUFDS,BUFG缓冲的区别

1、IBUFDS_GTE2 这是 Xilinx FPGA 中专门为 高速收发器&#xff08;SerDes/GTX/GTH/GTY&#xff09;参考时钟设计的差分输入缓冲器。 主要功能是将外部的差分时钟信号&#xff08;如LVDS、LVPECL等&#xff09;转换为FPGA内部的单端时钟信号&#xff0c;并保证信号的完整性和高…...

FPGA: XILINX Kintex 7系列器件的架构

本文将详细介绍Kintex-7系列FPGA器件的架构。以下内容将涵盖Kintex-7的核心架构特性、主要组成部分以及关键技术&#xff0c;尽量全面且结构化&#xff0c;同时用简洁的语言确保清晰易懂。 Kintex-7系列FPGA架构概述 Kintex-7是Xilinx 7系列FPGA中的中高端产品线&#xff0c;基…...

c/c++的opencv的图像预处理讲解

OpenCV 图像预处理核心技术详解 (C/C) 图像预处理是计算机视觉任务中至关重要的一步。原始图像往往受到噪声、光照不均、尺寸不一等多种因素的影响&#xff0c;直接用于后续分析&#xff08;如特征提取、目标检测、机器学习模型训练等&#xff09;可能会导致性能下降或结果不准…...

索恩格汽车SEG Automotive EDI 需求分析

SEG Automotive&#xff08;索恩格汽车&#xff09;是一家全球领先的汽车电气化系统供应商&#xff0c;专注于为传统内燃机和新能源车辆提供高效、可持续的动力解决方案。 EDI 在汽车行业的重要性 在汽车制造行业&#xff0c;高效的供应链是精益生产的核心。精益生产强调“按…...

【简单模拟实现list】

在C标准模板库&#xff08;STL&#xff09;中&#xff0c;list是一个非常强大的容器&#xff0c;它基于双向链表实现&#xff0c;支持高效的插入和删除操作。虽然我们可以直接使用STL中的list&#xff0c;但通过自己模拟实现一个list&#xff0c;可以更好地理解其背后的原理和数…...

深入解析ZAB协议:ZooKeeper的分布式一致性核心

引言 在分布式系统中&#xff0c;如何高效、可靠地实现多节点间的数据一致性是核心挑战之一。ZAB协议&#xff08;ZooKeeper Atomic Broadcast&#xff09;作为 ZooKeeper的核心算法&#xff0c;被广泛应用于分布式协调服务&#xff08;如Kafka、HBase、Dubbo等&#xff09;。…...

交叉熵损失函数,KL散度, Focal loss

交叉熵损失函数&#xff08;Cross-Entropy Loss&#xff09; 交叉熵损失函数&#xff0c;涉及两个概念&#xff0c;一个是损失函数&#xff0c;一个是交叉熵。 首先&#xff0c;对于损失函数。在机器学习中&#xff0c;损失函数就是用来衡量我们模型的预测结果与真实结果之间…...

k8s部署实战-springboot应用部署

在 Kubernetes 上部署 SpringBoot 应用实战指南 前言 本文将详细介绍如何将一个 SpringBoot 应用部署到 Kubernetes 集群中,包括制作镜像、编写部署文件、创建服务等完整步骤。 准备工作 1. 示例 SpringBoot 应用 假设我们有一个简单的 SpringBoot 应用,提供 REST API 服…...

快速选择算法:优化大数据中的 Top-K 问题

在处理海量数据时&#xff0c;经常会遇到这样的需求&#xff1a;找出数据中最大的前 K 个数&#xff0c;而不必对整个数据集进行排序。这种场景下&#xff0c;快速选择算法&#xff08;Quickselect&#xff09;就成了一个非常高效的解决方案。本文将通过一个 C 实现的快速选择算…...

uniapp-商城-60-后台 新增商品(属性的选中和页面显示)

前面添加了属性&#xff0c;添加属性的子级项目。也分析了如何回显&#xff0c;但是在添加新的商品的时&#xff0c;我们也同样需要进行选择&#xff0c;还要能正常的显示在界面上。下面对页面的显示进行分析。 1、界面情况回顾 属性显示其实是个一嵌套的数据显示。 2、选中的…...

利用 Amazon Bedrock Data Automation(BDA)对视频数据进行自动化处理与检索

当前点播视频平台搜索功能主要是基于视频标题的关键字检索。对于点播平台而言&#xff0c;我们希望可以通过优化视频搜索体验满足用户通过模糊描述查找视频的需求&#xff0c;从而提高用户的搜索体验。借助 Amazon Bedrock Data Automation&#xff08;BDA&#xff09;技术&…...

项目QT+ffmpeg+rtsp(一)——Qt的安装和rtsp的测试

文章目录 一、Qt安装二、插件配置tool与卸载三、下载ffmpeg四、查看能否使用(视频)五、代码复现5.1 rtsp申请5.2 rtsp在线测试5.3代码修改六、结果一、Qt安装 对于QT中5.12版本之后,都是使用在线版本,如果你想安装某一个的历史在线版本,一定要点击archive,不然显示不出来…...

高速光耦在通信行业的应用(五) | 5Mbps通信光耦的特性

针对5MBd速率光耦市场&#xff0c;晶台推出KL2200、KL2201和KL2202系列光耦 ,对标大部分国外品牌产品的应用&#xff1b;它分别由一个红外发射二极管和一个高速集成光电检测器逻辑门组成。 它采用 8 引脚 DIP 封装&#xff0c;并提供 SMD 选项。KL2200 的检测器具有一个三态输出…...

#跟着若城学鸿蒙# web篇-运动和方向传感器监测

前言 有些前端业务场景需要用到一些传感器&#xff0c;比如运动传感器和方向传感器来实现摇一摇功能。这就需要前端能够直接获取到相关数据&#xff0c;而不是通过 js 调用客户端代码来实现。 权限 还是需要在模块的module.json5文件中添加相关权限 {"name" : &qu…...

【匹配】Hirschberg

Hirschberg 文章目录 Hirschberg1. 算法介绍2. 公式及原理3. 伪代码 1. 算法介绍 背景与目标 Hirschberg 算法由 Dan Hirschberg 于1975年提出&#xff0c;是对 Needleman–Wunsch 全局比对的内存优化&#xff0c;通过分治策略将空间复杂度从 O ( m n ) O(mn) O(mn) 降到 O (…...

如何在 Windows 上安装类似 Synaptic 的 Chocolatey GUI 包管理器

如果你正在寻找类似 Linux 中 APT 的 Windows 包管理器&#xff0c;那么没有什么比 Chocolatey 更好的了。它是 Windows 10 上可用的最佳包管理器之一&#xff0c;可以通过命令行界面安装所有流行的软件和工具。然而&#xff0c;这并不意味着如果你不喜欢命令行&#xff0c;你就…...

激活函数全解析:定义、分类与 17 种常用函数详解

一、激活函数的定义与作用 定义&#xff1a; 激活函数是添加到人工神经网络中的函数&#xff0c;用于帮助网络学习数据中的复杂模式&#xff0c;决定神经元的输出。 核心作用&#xff1a; 为神经网络引入非线性&#xff0c;增强模型表达能力。需可微分&#xff08;或近似可微&…...

1-10 目录树

在ZIP归档文件中&#xff0c;保留着所有压缩文件和目录的相对路径和名称。当使用WinZIP等GUI软件打开ZIP归档文件时&#xff0c;可以从这些信息中重建目录的树状结构。请编写程序实现目录的树状结构的重建工作。 输入格式: 输入首先给出正整数N&#xff08;≤104&#xff09;…...

Python OOP核心技巧:如何正确选择实例方法、类方法和静态方法

Python方法类型全解析&#xff1a;实例方法、类方法与静态方法的使用场景 一、三种方法的基本区别二、访问能力对比表三、何时使用实例方法使用实例方法的核心场景&#xff1a;具体应用场景&#xff1a;1. 操作实例属性2. 对象间交互3. 实现特定实例的行为 四、何时使用类方法使…...

RK3588 ADB使用

安卓adb操作介绍 adb&#xff08;Android Debug Bridge&#xff09;是一个用于与安卓设备进行通信和控制的工具。adb可以通过USB或无线网络连接安卓设备&#xff0c;执行各种命令&#xff0c;如安装和卸载应用&#xff0c;传输文件&#xff0c;查看日志&#xff0c;运行shell命…...

ubuntu环境下 基于Python 打包的 批量命令行可视化操作工具 GUI

文章目录 一.需求&#xff1a;二.原理支撑&#xff1a;三.简单Demo四.封装成GUI1.依赖库2.代码 五.打包成可执行文件六.命令行的配置七.运行效果 一.需求&#xff1a; 作为测试工程师&#xff0c;为了到现场高效的调试&#xff0c;部署工作&#xff0c;需要一个可视化的工具&a…...

大语言模型 10 - 从0开始训练GPT 0.25B参数量 补充知识之模型架构 MoE、ReLU、FFN、MixFFN

写在前面 GPT&#xff08;Generative Pre-trained Transformer&#xff09;是目前最广泛应用的大语言模型架构之一&#xff0c;其强大的自然语言理解与生成能力背后&#xff0c;是一个庞大而精细的训练流程。本文将从宏观到微观&#xff0c;系统讲解GPT的训练过程&#xff0c;…...

SkyWalking的工作原理和搭建过程

SkyWalking 是一个开源的 应用性能监控系统&#xff08;APM&#xff09;&#xff0c;专为云原生、微服务架构设计。其核心原理基于 分布式追踪&#xff08;Distributed Tracing&#xff09;、指标收集&#xff08;Metrics Collection&#xff09; 和 日志关联&#xff08;Log C…...

CMS(plone / joomla 搭建测试)

开源选择 wordpress 用得最多 也是最容易有漏洞被攻击 被挂木马的 joomla &#xff08;JMS多站点&#xff1a;商业扩展&#xff09; — 多站点需付费 Drupal ProcessWire Plone因其内置的强大安全特性和较少的用户基础&#xff08;相比 WordPress 和 Joomla&#xff09;&#…...

基于 Flink 的实时推荐系统:从协同过滤到多模态语义理解

基于 Flink 的实时推荐系统&#xff1a;从协同过滤到多模态语义理解 嘿&#xff0c;各位技术小伙伴们&#xff01;在这个信息爆炸的时代&#xff0c;你是不是常常惊叹于各大平台仿佛能 “读懂你的心”&#xff0c;精准推送你感兴趣的内容呢&#xff1f;今天&#xff0c;小编就…...

Flink SQL、Hudi 、Doris在数据上的组合应用

Flink SQL、Hudi 和 Doris 是大数据领域中不同定位的技术组件&#xff0c;各自解决不同的问题&#xff0c;以下从核心定位、关键特性和典型场景三个维度展开说明&#xff1a; 1. Flink SQL&#xff1a;流批统一的实时计算引擎 核心定位&#xff1a;Flink 是 Apache 顶级的流批…...

Flink运维要点

一、Flink 运维核心策略 1. 集群部署与监控 资源规划 按业务优先级分配资源&#xff1a;核心作业优先保障内存和 CPU&#xff0c;避免资源竞争。示例&#xff1a;为实时风控作业分配专用 TaskManager&#xff0c;配置 taskmanager.memory.process.size8g。 监控体系 集成 Prom…...

VSCode + Cline AI辅助编程完全指南

VSCode Cline AI辅助编程完全指南 在当今AI快速发展的时代&#xff0c;程序员可以通过AI工具极大地提高工作效率。本教程将详细介绍如何使用VSCode结合Cline&#xff08;Claude AI助手&#xff09;进行AI辅助编程&#xff0c;帮助你提高开发效率&#xff0c;解决复杂问题。 …...

【源码级开发】Qwen3接入MCP,企业级智能体开发实战!

Qwen3接入MCP智能体开发实战&#xff08;上&#xff09; 一、MCP技术与Qwen3原生MCP能力介绍 1.智能体开发核心技术—MCP 1.1 Function calling技术回顾 如何快速开发一款智能体应用&#xff0c;最关键的技术难点就在于如何让大模型高效稳定的接入一些外部工具。而在MCP技术…...

回调函数应用示例

回调函数是一种通过函数指针&#xff08;或引用&#xff09;调用的函数&#xff0c;它在特定事件或条件发生时被另一个函数调用。回调函数的核心思想是将函数作为参数传递&#xff0c;以便在适当的时候执行自定义逻辑&#xff0c;常用于异步编程、事件驱动架构等场景。 业务场景…...

R语言如何解决导出pdf中文不显示的问题

前言 以前绘图都默认英文&#xff0c;突然要求都改成中文&#xff0c;呆住。。。。。。。。。 标题代码实现 ### 导入工具包 ### library(readr) library(dplyr) library(corrplot)df <- read_csv("./clinical.csv") df <- df %>% select(-id, -label)##…...

国产linux系统(银河麒麟,统信uos)使用 PageOffice自定义Word模版中的数据区域

​ PageOffice 国产版 &#xff1a;支持信创系统&#xff0c;支持银河麒麟V10和统信UOS&#xff0c;支持X86&#xff08;intel、兆芯、海光等&#xff09;、ARM&#xff08;飞腾、鲲鹏、麒麟等&#xff09;、龙芯&#xff08;Mips、LoogArch&#xff09;芯片架构。 在实际的Wor…...

llamafactory SFT 从断点恢复训练

背景 我使用llamafactory sft 微调模型的时候。gpu停止运行了。日志文件没有任何的报错信息。 显存还是占用状态。 查看llamafactory的进程是下述信息&#xff1a; 151312 151306 91 17:42 ? 03:58:10 [llamafactory-cl] 既然如此&#xff0c;那就只能从断点恢复训练了。 …...

C#里使用Prism.Core的例子

由于使用WPF来开发应用程序, 那么就会使用一些框架程序来加速开发,一般会使用Prism.Core来加速。 这个应用最后运行的显示如下: 第一步需要安装下面的包: <?xml version="1.0" encoding="utf-8"?> <packages><package id="Mi…...

【MySQL】数据库三大范式

目录 一. 什么是范式 二. 第一范式 三. 第二范式 不满足第二范式时可能出现的问题 四. 第三范式 一. 什么是范式 在数据库中范式其实就是一组规则&#xff0c;在我们设计数据库的时候&#xff0c;需要遵守不同的规则要求&#xff0c;设计出合理的关系型数据库&#xff0c;…...