当前位置: 首页 > news >正文

使用 Redis Stream 结合 Redission DelayedQueue 实现延迟消息队列(已上线生产)

一、Redis Stream 介绍

Redis Stream 是 Redis 5.0 版本新增加的数据结构,作为一种轻量级的消息队列,Redis Stream 作为一种轻量级的消息队列,适合资源有限或对性能要求较高的场景。

Redis Stream 主要用于消息队列(MQ,Message Queue),Redis 本身是有一个 Redis 发布订阅 (pub/sub) 来实现消息队列的功能,但它有个缺点就是消息无法持久化,如果出现网络断开、Redis 宕机等,消息就会被丢弃。

简单来说发布订阅 (pub/sub) 可以分发消息,但无法记录历史消息。

而 Redis Stream 提供了消息的持久化和主备复制功能,可以让任何客户端访问任何时刻的数据,并且能记住每一个客户端的访问位置,还能保证消息不丢失。

Redis Stream 的结构如下所示,它有一个消息链表,将所有加入的消息都串起来,每个消息都有一个唯一的 ID 和对应的内容:

每个 Stream 都有唯一的名称,它就是 Redis 的 key,在我们首次使用 xadd 指令追加消息时自动创建。

上图解析:

  • Consumer Group :消费组,使用 XGROUP CREATE 命令创建,一个消费组有多个消费者(Consumer)。
  • last_delivered_id :游标,每个消费组会有个游标 last_delivered_id,任意一个消费者读取了消息都会使游标 last_delivered_id 往前移动。
  • pending_ids :消费者(Consumer)的状态变量,作用是维护消费者的未确认的 id。 pending_ids 记录了当前已经被客户端读取的消息,但是还没有 ack (Acknowledge character:确认字符)。

二、如何实现延迟消息队列

我的方案是使用Redis的阻塞队列和延迟队列将消息存进去,当延迟时间到达时,将消息发送至Redis Stream消息队列即可。

Redis延迟队列原理如下:

  • Redisson延迟队列使用三个结构来存储,一个是queueName的list,值是添加的元素;一个是timeoutSetName的zset,值是添加的元素,score为timeout值;还有一个是getName()的blockingQueue,值是到期的元素。

take 方法用于从延迟队列拿取消息,如果延迟时间未到是拿不到的,会一直阻塞线程,直到到达指定时间,便会返回消息数据。

    /*** 从目标队列中取出头部元素(为空时阻塞线程)** @param queueName 队列名* @return 队列头部元素*/public static String take(@NonNull String queueName) throws InterruptedException {RBlockingQueue<String> targetQueue = CLIENT.getBlockingQueue(queueName);CLIENT.getDelayedQueue(targetQueue);return targetQueue.take();}

监听延迟队列,时间到达时推送至Redis Stream消息队列:

    /*** 时间到期后将消息推到 stream 消息队列** @param streamKey 队列名称* @param content   消息内容*/@SuppressWarnings("unchecked")private static void pushToStreamAfterExpire(String streamKey, String content) {// 开启一个新线程处理延迟消息,防止阻塞主线程new Thread(() -> {while (true) {String messages = "";try {// 从延迟消息队列拿取消息messages = take(streamKey);} catch (InterruptedException e) {Thread.currentThread().interrupt();log.info("(线程被中断) Stream 键:{}", streamKey);}// 推送至streamMap<String, Object> messageMap = JSONUtil.toBean(messages, Map.class);RedisStreamUtil.push(RedisStreamUtil.BASE_STREAM_NAME + streamKey, messageMap);log.info("(发布延迟消息到 Redis Stream 成功) Stream 键:{},消息值:{}", streamKey, content);}}, "redis_delay_queue").start();}

三、代码实战(Java)

1、对Redis Stream做配置,主要做了以下几件事:
  • 初始化线程池
  • 对监听器容器做初始化
  • 监听器订阅消息主题和消费者组
  • 对未成功消费的消息定时做重试补偿机制
package com.ricent.common.redis.stream.config;import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.text.CharSequenceUtil;
import cn.hutool.core.util.ObjectUtil;
import com.ricent.common.redis.stream.listener.BaseRedisStreamMessageListener;
import com.ricent.common.redis.stream.util.RedisStreamUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RStream;
import org.redisson.api.StreamGroup;
import org.redisson.api.StreamMessageId;
import org.redisson.api.stream.StreamAddArgs;
import org.redisson.api.stream.StreamCreateGroupArgs;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.stream.Subscription;import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;/*** redis stream消息队列配置** @author clq* @since 2024/11/11*/
@Configuration
@Slf4j
@RequiredArgsConstructor
public class RedisStreamConfig {private final List<StreamListener<String, MapRecord<String, String, String>>> listeners;private final List<BaseRedisStreamMessageListener> baseListeners;private final ConcurrentMap<String, Map<String, String>> streamGroups = new ConcurrentHashMap<>();private final RedisConnectionFactory redisConnectionFactory;private static final String BASE_STREAM_NAME = "redis_stream:";@Beanpublic StreamMessageListenerContainer<String, MapRecord<String, String, String>> streamMessageListenerContainer() {// 初始化线程池AtomicInteger index = new AtomicInteger(1);int processors = Runtime.getRuntime().availableProcessors();ThreadPoolExecutor executor = new ThreadPoolExecutor(processors, processors, 0, TimeUnit.SECONDS,new LinkedBlockingDeque<>(), r -> {Thread thread = new Thread(r);thread.setName("redis-stream-consumer-thread-" + index.getAndIncrement());thread.setDaemon(true);return thread;});// 初始化消息监听容器StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options =StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder().pollTimeout(Duration.ofSeconds(3)).batchSize(5).serializer(new StringRedisSerializer()).errorHandler(e -> log.error(e.getMessage(), e)).executor(executor).build();// 创建容器StreamMessageListenerContainer<String, MapRecord<String, String, String>> container =StreamMessageListenerContainer.create(redisConnectionFactory, options);container.start();log.info("Redis Stream Message Listener Container initialized and started");return container;}/*** 监听器配置* 消费者组名称为 stream名称_group* 消费者名称为 stream名称_consumer** @param container 监听容器* @return 所有监听者的订阅信息*/@Beanpublic List<Subscription> subscriptions(StreamMessageListenerContainer<String, MapRecord<String, String, String>> container) {return listeners.stream().map(listener -> {BaseRedisStreamMessageListener baseListener = (BaseRedisStreamMessageListener) listener;String streamName = CharSequenceUtil.toUnderlineCase(baseListener.getStreamName());String groupName = streamName + "_group";String consumerName = streamName + "_consumer";String finalStreamName = BASE_STREAM_NAME + streamName;// 初始化消息队列和消费者组initStreamAndGroup(finalStreamName, groupName);// 记录所有消费主题和消费者组及消费者名称streamGroups.computeIfAbsent(finalStreamName, k -> new ConcurrentHashMap<>()).put(groupName, consumerName);// 订阅消费者组log.info("为 stream: '{}' with group: '{}' and consumer: '{}' 创建订阅", finalStreamName, groupName, consumerName);return container.register(streamReadRequest(finalStreamName, groupName, consumerName), listener);}).toList();}/*** 自定义 stream 读取消息请求* (使用默认的 container.receive() 方法遇到异常或 redis 重启会取消订阅,导致监听器后续收不到消息)** @param streamName   主题名称* @param groupName    消费者组名称* @param consumerName 消费者名称* @return Request to read a Redis Stream.*/private StreamMessageListenerContainer.StreamReadRequest<String> streamReadRequest(String streamName, String groupName, String consumerName) {return StreamMessageListenerContainer.StreamReadRequest.builder(StreamOffset.create(streamName, ReadOffset.lastConsumed())).consumer(Consumer.from(groupName, consumerName)).cancelOnError(t -> {if (Boolean.FALSE.equals(RedisStreamUtil.hasKey(streamName))) {log.error("redis stream :【消息主题:{} 被删除,已取消订阅!】", streamName);return true;}RedisConnection connection = redisConnectionFactory.getConnection();if (ObjectUtil.notEqual("PONG", connection.ping())) {log.error("----- redis 连接断开,请检查 redis 客户端状态! -----");}return false;}).autoAcknowledge(false).build();}/*** 初始化 stream 和 创建消费者组** @param streamName 名称* @param groupName  消费者组名称*/private void initStreamAndGroup(String streamName, String groupName) {RStream<String, Object> stream = RedisStreamUtil.CLIENT.getStream(streamName);// 检查 stream 是否存在if (!stream.isExists()) {log.info("Creating stream: '{}'", streamName);// 创建主题StreamMessageId messageId = stream.add(StreamAddArgs.entries(Map.of("initialField", "initialValue")));// 创建消费组stream.createGroup(StreamCreateGroupArgs.name(groupName));// 删除初始化消息stream.remove(messageId);log.info("Stream: '{}' 和 group: '{}' 初始化完成.", streamName, groupName);}// 检查消费者组是否存在try {List<StreamGroup> existingGroups = stream.listGroups();boolean groupExists = existingGroups.stream().anyMatch(group -> group.getName().equals(groupName));if (!groupExists) {log.info("Creating consumer group: '{}' for stream: '{}'", groupName, streamName);stream.createGroup(StreamCreateGroupArgs.name(groupName));log.info("消费者组: '{}' for stream: '{}' 创建完成.", groupName, streamName);}} catch (Exception e) {log.error("Error while ensuring consumer group exists for stream: '{}'", streamName, e);}}/*** 每 15分钟进行一次心跳检测,并对 pending的任务进行处理*/@Bean(destroyMethod = "shutdown")public ScheduledExecutorService pendingMessagesChecker() {ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);Runnable pendingMessagesTask = () -> {try {for (Map.Entry<String, Map<String, String>> entry : streamGroups.entrySet()) {// 消息主题String streamName = entry.getKey();Map<String, String> groups = entry.getValue();for (Map.Entry<String, String> groupEntry : groups.entrySet()) {// 消费者组String groupName = groupEntry.getKey();// 消费者String consumerName = groupEntry.getValue();// 对 pending 的消息进行处理baseListeners.stream().filter(i -> streamName.equals(BASE_STREAM_NAME + i.getStreamName())).findFirst().ifPresent(messageListener -> {Map<StreamMessageId, Map<String, Object>> messageIdMap = RedisStreamUtil.fetchNotAckAndConsume(streamName, groupName, consumerName,60L, i -> messageListener.consume(streamName, i.getValue()));if (CollUtil.isNotEmpty(messageIdMap)) {log.info("主题 {} 已成功处理{}条 pending 消息!消息体为:【{}】", streamName, messageIdMap.size(), messageIdMap);}});}}} catch (Exception e) {log.error("Error while checking pending messages", e);}};// 第一次任务立即执行,之后每15分钟执行一次executorService.scheduleWithFixedDelay(pendingMessagesTask, 0, 15, TimeUnit.MINUTES);return executorService;}
}
2、配置基类监听器,基类监听器实现了 StreamListener 接口,并且定义了 consume 方法,该方法用于实现具体的业务逻辑,后续监听器只需要继承 BaseRedisStreamMessageListener 类,重写consume方法即可。
package com.ricent.common.redis.stream.listener;import com.ricent.common.redis.stream.util.RedisStreamUtil;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.StreamMessageId;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.stereotype.Component;import java.util.HashMap;
import java.util.Map;
import java.util.Objects;/*** redis stream 消息监听器基类** @author clq* @since 2024/11/11*/
@Data
@Slf4j
@Component
public abstract class BaseRedisStreamMessageListener implements StreamListener<String, MapRecord<String, String, String>> {protected String streamName;@Overridepublic void onMessage(MapRecord<String, String, String> message) {log.info("--- redis stream 接收到消息 ---  message :{}", message);try {// 消息消费Map<String, Object> objectMap = processMap(message.getValue());consume(streamName, objectMap);// 消息确认ackMessage(message);} catch (Exception e) {log.error("Error while consuming message from stream: '{}'. Message: {}", streamName, message, e);}}/*** 具体的消费逻辑** @param streamName 主题* @param message    消息体*/public abstract void consume(String streamName, Map<String, Object> message);/*** 消息确认机制** @param message 消息内容*/private void ackMessage(MapRecord<String, String, String> message) {RecordId id = message.getId();String finalStreamName = RedisStreamUtil.BASE_STREAM_NAME + streamName;String groupName = streamName + "_group";boolean isValidRecordId = Objects.nonNull(id.getTimestamp()) && Objects.nonNull(id.getSequence());if (isValidRecordId) {StreamMessageId messageId = new StreamMessageId(id.getTimestamp(), id.getSequence());Long ackCount = RedisStreamUtil.ack(finalStreamName, groupName, messageId);log.info("已确认 : {} 条消息,消息Id为 {}", ackCount, messageId);}}/*** 将 Map<String,String> 转换成 Map<String,Object>* 去除 value 反序列化后多余的引号** @param map 处理前消息体* @return 处理后的消息体*/public static Map<String, Object> processMap(Map<String, String> map) {Map<String, Object> resultMap = new HashMap<>();for (Map.Entry<String, String> entry : map.entrySet()) {String key = entry.getKey();String value = entry.getValue();// 检查value是否以两个引号开头和结尾if (value.startsWith("\"") && value.endsWith("\"")) {// 去除第一个和最后一个引号value = value.substring(1, value.length() - 1);}resultMap.put(key, value);}return resultMap;}
}
3、以下是两个工具类,分别是 RedisStreamUtil 和 RedisDelayQueueUtil
package com.ricent.common.redis.stream.util;import cn.hutool.core.text.CharSequenceUtil;
import cn.hutool.extra.spring.SpringUtil;
import lombok.experimental.UtilityClass;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.*;
import org.redisson.api.stream.StreamAddArgs;
import org.redisson.api.stream.StreamCreateGroupArgs;
import org.redisson.api.stream.StreamReadGroupArgs;
import org.redisson.client.codec.LongCodec;
import org.springframework.data.redis.connection.stream.MapRecord;import java.text.MessageFormat;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;/*** redis stream 消息队列工具类** @author clq* @since 2024/11/11*/
@Slf4j
@SuppressWarnings("unused")
@UtilityClass
public class RedisStreamUtil {public static final String BASE_STREAM_NAME = "redis_stream:";public static final RedissonClient CLIENT = SpringUtil.getBean(RedissonClient.class);/*** 向指定的 Redis Stream 中添加消息** @param streamName Stream 的名称* @param message    要添加的消息内容* @return 添加的消息的 ID*/public static StreamMessageId push(String streamName, Map<String, Object> message) {MapRecord<String, String, Object> streamRecord = MapRecord.create(streamName, message);RStream<String, Object> stream = CLIENT.getStream(streamName);return stream.add(StreamAddArgs.entries(streamRecord.getValue()));}/*** 向指定的 Redis Stream 中添加消息** @param message 要添加的消息内容* @return 添加的消息的 ID*/public static StreamMessageId push(MapRecord<String, String, Object> message) {RStream<String, Object> stream = CLIENT.getStream(message.getStream());return stream.add(StreamAddArgs.entries(message.getValue()));}/*** 从指定的 stream 中读取从未读取的消息* 使用默认消费者组:streamName_group* 使用默认消费者:streamName_consumer*/public static Map<StreamMessageId, Map<String, Object>> pull(String streamName) {String groupName = MessageFormat.format("{0}_group", CharSequenceUtil.toUnderlineCase(streamName));String consumerName = MessageFormat.format("{0}_consumer", CharSequenceUtil.toUnderlineCase(streamName));return pull(streamName, groupName, consumerName, null);}/*** 从指定的 stream 中读取从未读取的消息* 使用默认消费者组:streamName_group* 使用默认消费者:streamName_consumer*/public static Map<StreamMessageId, Map<String, Object>> pull(String streamName, Consumer<MapRecord<String, String, Object>> consumer) {String groupName = CharSequenceUtil.toUnderlineCase(streamName) + "_group";String consumerName = CharSequenceUtil.toUnderlineCase(streamName) + "_consumer";return pull(streamName, groupName, consumerName, consumer);}/*** 从指定的 stream 中读取从未读取的消息(可以指定消费者组和消费者)** @param streamName   Stream 的名称* @param groupName    消费者组名称* @param consumerName 消费者的名称* @return 轮询到的消息列表*/public static Map<StreamMessageId, Map<String, Object>> pull(String streamName, String groupName, String consumerName, Consumer<MapRecord<String, String, Object>> consumer) {// 获取 Redis 消息队列RStream<String, Object> redisMsgQueue = CLIENT.getStream(streamName);// 设置默认的消费者组和消费者名称groupName = CharSequenceUtil.isBlank(groupName) ? CharSequenceUtil.toUnderlineCase(streamName) + "_group" : groupName;consumerName = CharSequenceUtil.isBlank(consumerName) ? CharSequenceUtil.toUnderlineCase(streamName) + "_consumer" : consumerName;try {// 创建消费者组List<String> nameList = redisMsgQueue.listGroups().stream().map(StreamGroup::getName).toList();if (!nameList.contains(groupName)) {redisMsgQueue.createGroup(StreamCreateGroupArgs.name(groupName));}// 读取消息Map<StreamMessageId, Map<String, Object>> messages = redisMsgQueue.readGroup(groupName, consumerName, StreamReadGroupArgs.neverDelivered());// 处理消息for (Map.Entry<StreamMessageId, Map<String, Object>> e : messages.entrySet()) {StreamMessageId id = e.getKey();Map<String, Object> recordMap = e.getValue();MapRecord<String, String, Object> mapRecord = MapRecord.create(streamName, recordMap);if (consumer != null) {consumer.accept(mapRecord);redisMsgQueue.ack(groupName, id);}}return new HashMap<>(messages);} catch (Exception e) {log.error("Error while pulling messages from Redis stream", e);throw e;}}/*** 从指定的消费者组中获取未确认的消息并处理** @param streamName   Stream 的名称* @param groupName    消费者组名称* @param consumerName 消费者的名称* @param ackTimeout   确认超时时间(秒)* @param consumer     消息处理回调* @return 未确认的消息列表*/public static Map<StreamMessageId, Map<String, Object>> fetchNotAckAndConsume(String streamName, String groupName, String consumerName, long ackTimeout, Consumer<MapRecord<String, String, Object>> consumer) {Map<StreamMessageId, Map<String, Object>> output = Collections.emptyMap();// 获取消费者组的待处理消息信息RStream<String, Object> redisMsgQueue = CLIENT.getStream(streamName);PendingResult pr = redisMsgQueue.getPendingInfo(groupName);if (pr.getTotal() == 0) {return output;}// 从最低 ID 开始读取消息StreamMessageId curId = pr.getLowestId();StreamMessageId maxId = pr.getHighestId();while (true) {// 读取一批待处理消息List<PendingEntry> entries = redisMsgQueue.listPending(groupName, curId, maxId, 100);if (entries.isEmpty()) {return output;}// 更新当前读取的 IDcurId = makeExclusive(entries.get(entries.size() - 1).getId());// 声明超时的待处理消息StreamMessageId[] messageIds = entries.stream().map(PendingEntry::getId).toArray(StreamMessageId[]::new);Optional<Map<StreamMessageId, Map<String, Object>>> claimOptional = Optional.ofNullable(redisMsgQueue.claim(groupName, consumerName, ackTimeout, TimeUnit.SECONDS, messageIds));// 处理每条消息claimOptional.ifPresent(claim -> claim.forEach((id, message) -> {log.info("NotACKMessage ID: {}, Message: {}", id, message);MapRecord<String, String, Object> mapRecord = MapRecord.create(streamName, message);if (consumer != null) {consumer.accept(mapRecord);long ackCount = redisMsgQueue.ack(groupName, id);if (ackCount > 0) {log.info("Acked message ID: {}, Message: {}", id, message);}}}));}}/*** 判断 stream 是否存在** @param streamName Stream 的名称* @return 如果流存在则返回 true,否则 false*/public static Boolean hasKey(String streamName) {RBucket<String> bucket = CLIENT.getBucket(streamName);return bucket.isExists();}/*** 确认消费** @param streamName Stream 的名称* @param groupName  消费者组* @param recordIds  消息id* @return java.lang.Long*/public static Long ack(String streamName, String groupName, StreamMessageId... recordIds) {RStream<String, Object> stream = CLIENT.getStream(streamName);return stream.ack(groupName, recordIds);}/*** 删除消费队列,及其消费者组** @param streamName Stream 的名称* @return 是否成功删除*/public static <K, V> Boolean delete(String streamName) {RStream<K, V> stream = CLIENT.getStream(streamName);return stream.delete();}/*** 移除消费者组的消息id** @param streamName Stream 的名称* @param ids        消息 id 列表* @return 是否成功删除*/public static <K, V> Long remove(String streamName, StreamMessageId... ids) {RStream<K, V> stream = CLIENT.getStream(streamName);return stream.remove(ids);}/*** 生成一个唯一的 StreamMessageId** @param id 原始的 StreamMessageId* @return 生成的唯一的 StreamMessageId*/private static StreamMessageId makeExclusive(StreamMessageId id) {return new StreamMessageId(id.getId0(), id.getId1() + 1);}/*** 根据指定的 lastSize 缩减 Redis Stream 的大小** @param streamName       stream 的名称* @param lastSize         缩减后的大小* @param queueSizeMapName 用于记录缩减后的大小的哈希表名称* @return 缩减后的大小*/public static long shrinkByLastSize(String streamName, long lastSize, String queueSizeMapName) {// lua 脚本String shrinkScript = """local nowSz = redis.call('XLEN', KEYS[1]);redis.call('XTRIM', KEYS[1], 'MAXLEN', nowSz - tonumber(ARGV[1]));local leftSz = redis.call('XLEN', KEYS[1]);redis.call('HSET', KEYS[2], KEYS[1], leftSz);return leftSz;""";RScript rScript = CLIENT.getScript(LongCodec.INSTANCE);return rScript.eval(RScript.Mode.READ_WRITE, shrinkScript, RScript.ReturnType.INTEGER,Arrays.asList(streamName, queueSizeMapName), lastSize);}}
package com.ricent.common.redis.stream.util;import cn.hutool.core.collection.CollUtil;
import cn.hutool.extra.spring.SpringUtil;
import cn.hutool.json.JSONUtil;
import lombok.NonNull;
import lombok.experimental.UtilityClass;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.redisson.api.RBlockingDeque;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;import java.time.Duration;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;/*** 延迟队列工具类** @author clq* @since 2024-11-11*/
@Slf4j
@UtilityClass
@SuppressWarnings("unused")
public class RedisDelayQueueUtil {public static final RedissonClient CLIENT = SpringUtil.getBean(RedissonClient.class);/*** 从目标队列中取出头部元素(为空时返回 null)** @param queueName 队列名称* @return 队列头部元素*/public static String poll(@NonNull String queueName) {RBlockingQueue<String> targetQueue = CLIENT.getBlockingQueue(queueName);CLIENT.getDelayedQueue(targetQueue);return targetQueue.poll();}/*** 从目标队列中取出头部元素(为空时阻塞线程)** @param queueName 队列名* @return 队列头部元素*/public static String take(@NonNull String queueName) throws InterruptedException {RBlockingQueue<String> targetQueue = CLIENT.getBlockingQueue(queueName);CLIENT.getDelayedQueue(targetQueue);return targetQueue.take();}/*** 从目标队列中获取头部元素(仅获取头部元素而不删除,为空时返回 null)** @param queueName 队列名* @return 队列头部元素*/public static String peek(@NonNull String queueName) {RBlockingQueue<String> targetQueue = CLIENT.getBlockingQueue(queueName);CLIENT.getDelayedQueue(targetQueue);return targetQueue.peek();}/*** 添加延迟队列** @param queueName 队列名称* @param element   队列元素* @param endTime   延迟至指定时间*/public static List<String> addDelayQueue(String queueName, String element, LocalDateTime endTime) {long seconds = Duration.between(LocalDateTime.now(), endTime).getSeconds();if (seconds > 0) {return addDelayQueue(queueName, element, seconds, TimeUnit.SECONDS);}throw new IllegalArgumentException("延迟时间必须大于当前时间!");}/*** 添加延迟队列** @param queueName 队列名称* @param element   队列元素* @param delay     延迟时间* @param timeUnit  时间单位*/public static List<String> addDelayQueue(String queueName, String element, long delay, TimeUnit timeUnit) {try {RBlockingQueue<String> blockingDeque = CLIENT.getBlockingQueue(queueName);RDelayedQueue<String> delayedQueue = CLIENT.getDelayedQueue(blockingDeque);delayedQueue.offer(element, delay, timeUnit);log.info("(添加延时队列成功) 队列名称:{},队列元素:{},延迟时间:{}", queueName, element, timeUnit.toSeconds(delay) + "秒");return delayedQueue.readAll();} catch (Exception e) {log.error("(添加延时队列失败) {}", e.getMessage());throw e;}}/*** 删除延迟队列中的消息** @param queueName 队列名称* @param element   队列元素* @return 是否删除成功*/public static boolean removeDelayedQueue(@NonNull String queueName, @NonNull String element) {if (StringUtils.isBlank(queueName)) {return false;}try {RBlockingDeque<String> blockingDeque = CLIENT.getBlockingDeque(queueName);RDelayedQueue<String> delayedQueue = CLIENT.getDelayedQueue(blockingDeque);boolean flag = delayedQueue.remove(element);if (flag) {log.info("(删除延时队列保证唯一性) 队列元素:{},队列元素:{}", queueName, element);}delayedQueue.destroy();return flag;} catch (Exception e) {log.error("(删除延时队列异常) 队列名称:{},队列元素:{},错误信息:{}", queueName, element, e.getMessage());throw e;}}/*** 将延迟队列中的消息发布到 Redis Stream** @param value     消息值* @param streamKey Stream 键* @param delay     延迟时间* @param timeUnit  时间单位*/public static void publishToStream(String streamKey, String value, long delay, TimeUnit timeUnit) {// 将消息添加至延迟队列List<String> list = addDelayQueue(streamKey, value, delay, timeUnit);// 到期后推送至 stream 队列if (CollUtil.isNotEmpty(list)) {pushToStreamAfterExpire(streamKey, value);}}/*** 将延迟队列中的消息发布到 Redis Stream** @param streamKey Stream 键* @param content   消息内容* @param endTime   延迟消费的时间*/public static void publishToStream(String streamKey, String content, LocalDateTime endTime) {// 将消息添加至延迟队列List<String> list = addDelayQueue(streamKey, content, endTime);if (CollUtil.isNotEmpty(list)) {// 到期后推送至 stream 队列pushToStreamAfterExpire(streamKey, content);}}/*** 时间到期后将消息推到 stream 消息队列** @param streamKey 队列名称* @param content   消息内容*/@SuppressWarnings("unchecked")private static void pushToStreamAfterExpire(String streamKey, String content) {// 开启一个新线程处理延迟消息,防止阻塞主线程new Thread(() -> {while (true) {String messages = "";try {// 从延迟消息队列拿取消息messages = take(streamKey);} catch (InterruptedException e) {Thread.currentThread().interrupt();log.info("(线程被中断) Stream 键:{}", streamKey);}// 推送至streamMap<String, Object> messageMap = JSONUtil.toBean(messages, Map.class);RedisStreamUtil.push(RedisStreamUtil.BASE_STREAM_NAME + streamKey, messageMap);log.info("(发布延迟消息到 Redis Stream 成功) Stream 键:{},消息值:{}", streamKey, content);}}, "redis_delay_queue").start();}
}
4、最后在我们的消息模块定义 RedisEventBusUtils ,后续通过该工具类发布订阅消息。
package com.ricent.common.eventbus.utils;import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.text.CharSequenceUtil;
import cn.hutool.json.JSONUtil;
import com.ricent.common.core.exception.ServiceException;
import com.ricent.common.redis.stream.util.RedisDelayQueueUtil;
import com.ricent.common.redis.stream.util.RedisStreamUtil;
import lombok.experimental.UtilityClass;
import lombok.extern.slf4j.Slf4j;import java.time.LocalDateTime;
import java.util.Map;
import java.util.concurrent.TimeUnit;/*** Redis Stream 消息工具类** @author clq* @since 2024/11/11*/
@Slf4j
@UtilityClass
public class RedisEventBusUtils {/*** 发送 Redis 消息事件** @param streamName 消息主题* @param messages   消息体*/public static void publishEvent(String streamName, Map<String, Object> messages) {validateParam(streamName, messages);RedisStreamUtil.push(RedisStreamUtil.BASE_STREAM_NAME + streamName, messages);}/*** 发送 Redis 延迟消息事件** @param streamName 消息主题* @param messages   消息体* @param delay      延迟时间* @param timeUnit   时间单位*/public static void publishEvent(String streamName, Map<String, Object> messages, Long delay, TimeUnit timeUnit) {validateParam(streamName, messages);RedisDelayQueueUtil.publishToStream(streamName, JSONUtil.toJsonStr(messages), delay, timeUnit);}/*** 发送 Redis 延迟消息事件** @param streamName 消息主题* @param messages   消息体* @param endTime    到期时间*/public static void publishEvent(String streamName, Map<String, Object> messages, LocalDateTime endTime) {validateParam(streamName, messages);RedisDelayQueueUtil.publishToStream(streamName, JSONUtil.toJsonStr(messages), endTime);}/*** 校验参数** @param streamName 消息主题* @param messages   消息体*/private static void validateParam(String streamName, Map<String, Object> messages) {if (CharSequenceUtil.isBlank(streamName)) {throw new ServiceException("streamName is blank:消息主题不能为空!");}if (CollUtil.isEmpty(messages)) {throw new ServiceException("messages is empty:消息体不能为空!");}}
}
5、至此,所有的代码配置工作已完成,接下来编写消费者实现业务逻辑和然后在实际业务需要的地方发布消息即可。

以下是一个使用 demo

消费者(继承BaseRedisStreamMessageListener类,重写consume方法,编写实际业务逻辑即可)

package com.ricent.meetingtask.consume;import com.ricent.common.redis.stream.listener.BaseRedisStreamMessageListener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;import java.util.Map;/*** 消息监听器** @author clq* @since 2024/11/11*/
@Slf4j
@Component
public class MeetingTaskListener extends BaseRedisStreamMessageListener {public MeetingTaskListener() {this.streamName = "demo_task";}@Overridepublic void consume(String streamName, Map<String, Object> message) {log.info("--- 消费消息 --- stream :{} message :{}", streamName, message);// 实际业务逻辑}
}

生产者(在需要的地方发送订阅消息即可,注意Stream名称一定要一致)

RedisEventBusUtils.publishEvent("demo_task", message,30L, TimeUnit.MINUTES);

四、总结

1)Redis Stream相比于它原生的消息订阅方式,具有以下的优势:
  • 支持范围查找:内置的索引功能,可以通过索引来对消息进行范围查找
  • 支持阻塞操作:避免低效的反复轮询查找消息
  • 支持 ACK:可以通过确认机制来告知已经成功处理了消息,保证可靠性
  • 支持多个消费者:多个消费者可以同时消费同一个流,Redis 会确保每个消费者都可以独立地消费流中的消息
  • 支持消息持久化: Redis Stream 提供了消息的持久化和主备复制功能,可以让任何客户端访问任何时刻的数据,并且能记住每一个客户端的访问位置,还能保证消息不丢失。
2)但是对比专门的MQ消息队列,功能还是相对简单
特性/功能Redis Stream专业 MQ 系统 (如 Kafka、RabbitMQ、RocketMQ)
持久化支持 RDB 和 AOF更强大的持久化和存储机制
主备复制支持更复杂的分布式复制和集群管理
消息确认(ACK)支持更完善的消息确认和处理机制
功能丰富度相对简单更多高级功能(如消息回溯、死信队列等)
集群管理和扩展性简单更强大的集群管理和扩展能力
资源消耗适合小到中等规模更适合大规模、高并发场景
社区支持和生态系统相对较小更丰富的社区支持和生态系统
 3)我们结合了 Redis 的延迟队列和 Redis Stream的消息队列实现了延迟消息队列,但是注意它具有以下优缺点:

优点:轻便,继承了Redis的高性能,并且占用资源低;

缺点:适用于延迟时间确定的场景,如果延迟时间一直变动,需要额外的逻辑来动态调整延迟时间,会增加系统的复杂性和维护难度。

4)适用场景
  • 定时任务调度:适合需要定时执行的任务,定时时间在任务发布时就已经确定。
  • 固定延迟的消息处理:适合延迟时间固定且不易变动的场景,如订单取消、优惠券过期等。
  • 轻量级应用:适合资源有限、性能要求高的小到中等规模的应用场景。

ps:以下是我整理的java面试资料,感兴趣的可以看看。最后,创作不易,觉得写得不错的可以点点关注!

链接:https://www.yuque.com/u39298356/uu4hxh?# 《Java知识宝典》 

相关文章:

使用 Redis Stream 结合 Redission DelayedQueue 实现延迟消息队列(已上线生产)

一、Redis Stream 介绍 Redis Stream 是 Redis 5.0 版本新增加的数据结构&#xff0c;作为一种轻量级的消息队列&#xff0c;Redis Stream 作为一种轻量级的消息队列&#xff0c;适合资源有限或对性能要求较高的场景。 Redis Stream 主要用于消息队列&#xff08;MQ&#xff…...

时频转换 | Matlab基于递归图Reccurence Plots一维数据转二维图像方法

目录 基本介绍程序设计参考资料获取方式 基本介绍 时频转换 | Matlab基于递归图Reccurence Plots一维数据转二维图像方法 程序设计 clear clc close allfs 6400 ; % 数据采样频率 N 5120; % 信号的点数% 生成时间向量 t (0:N-1) / fs; % 生成正弦信号 x sin(2 * pi * 15…...

《手写Spring渐进式源码实践》实践笔记 (第二十一章 将ORM框架整合到Spring容器中)

文章目录 第二十一章 ORM框架整合Spring背景目标设计实现代码结构类图实现步骤 测试事先准备属性配置文件测试用例测试结果&#xff1a; 总结 第二十一章 ORM框架整合Spring 背景 MyBatis-Spring 能够实现 MyBatis 与 Spring 框架的无缝集成。它使得 MyBatis 能够参与 Spring…...

数据库管理-第267期 23ai:Oracle Data Redaction演示(20241128)

数据库管理267期 2024-11-286 数据库管理-第267期 23ai&#xff1a;Oracle Data Redaction演示&#xff08;20241128&#xff09;1 示例表及数据2 创建编校策略2.1 名字全编校2.2 电话部分编校 3 DML演示3.1 场景13.2 场景2 总结 数据库管理-第267期 23ai&#xff1a;Oracle Da…...

NGO-CNN-BiGRU-Attention北方苍鹰算法优化卷积双向门控循环单元时间序列预测,含优化前后对比

NGO-CNN-BiGRU-Attention北方苍鹰算法优化卷积双向门控循环单元时间序列预测&#xff0c;含优化前后对比 目录 NGO-CNN-BiGRU-Attention北方苍鹰算法优化卷积双向门控循环单元时间序列预测&#xff0c;含优化前后对比预测效果基本介绍模型描述程序设计参考资料 预测效果 基本介…...

第六届国际科技创新学术交流会暨管理科学信息化与经济创新发展(MSIEID 2024)

重要信息 大会官网&#xff1a;msieid2024.iaecst.org &#xff08;点击了解大会&#xff0c;参会等内容&#xff09; 大会时间&#xff1a;2024年12月6-8日 大会地点&#xff1a;中国-广州 大会简介 随着全球化和信息化的不断深入&#xff0c;管理科学、信息化和经济发展…...

【计算机视觉算法与应用】模板匹配、图像配准

目录 1. 基于灰度值的模板匹配 2. 基于相关性的模板匹配 3. 基于形状的模板匹配 4. 基于组件的模板识别 5. 基于形变的模板匹配 6. 基于描述符的模板匹配 7. 基于点的模板匹配 性能比较 模板匹配的算法实现需要结合具体需求和应用场景来选择方法。以下是基于 OpenCV 的…...

HHO-CNN-BiGRU-Attention哈里斯鹰优化算法卷积神经网络结合双向门控循环单元时间序列预测,含优化前后对比

HHO-CNN-BiGRU-Attention哈里斯鹰优化算法卷积神经网络结合双向门控循环单元时间序列预测&#xff0c;含优化前后对比 目录 HHO-CNN-BiGRU-Attention哈里斯鹰优化算法卷积神经网络结合双向门控循环单元时间序列预测&#xff0c;含优化前后对比预测效果基本介绍模型描述程序设计…...

数据并行、模型并行与张量并行:深度学习中的并行计算策略(中英双语)

中文版 数据并行、模型并行与张量并行&#xff1a;深度学习中的并行计算策略 随着深度学习模型的不断增大&#xff0c;单个计算节点&#xff08;例如单个 GPU&#xff09;的计算和内存能力逐渐成为了限制训练效率和模型规模的瓶颈。为了应对这些挑战&#xff0c;深度学习社区…...

大数据-239 离线数仓 - 广告业务 测试 FlumeAgent 加载ODS、DWD层

点一下关注吧&#xff01;&#xff01;&#xff01;非常感谢&#xff01;&#xff01;持续更新&#xff01;&#xff01;&#xff01; Java篇开始了&#xff01; 目前开始更新 MyBatis&#xff0c;一起深入浅出&#xff01; 目前已经更新到了&#xff1a; Hadoop&#xff0…...

Python中的数据结构深入解析:从列表到字典的优化技巧

《Python OpenCV从菜鸟到高手》带你进入图像处理与计算机视觉的大门! Python是一门以易用性和可读性著称的高级编程语言,其内置的数据结构为开发者提供了强大的工具,但了解其底层实现及性能优化策略却常被忽略。本文深入探讨Python中的核心数据结构,包括列表(list)、元组…...

【JS】JS判断数据类型

typeof // typeof 后面括号有没有都可以 console.log(typeof(a)) // string console.log(typeof(123)) // number console.log(typeof(undefined)) // undefined console.log(typeof(true)) // boolean console.log(typeof(Symbol(123))) // symbolconsole.log(typeof(null)) /…...

基于jmeter+perfmon的稳定性测试记录

软件测试资料领取&#xff1a;[内部资源] 想拿年薪40W的软件测试人员&#xff0c;这份资料必须领取~ 软件测试面试刷题工具领取&#xff1a;软件测试面试刷题【800道面试题答案免费刷】 1. 引子 最近承接了项目中一些性能测试的任务&#xff0c;因此决定记录一下&#xff0c…...

【0351】Postgres内核 Open WAL segment(包含 WAL 位置 ‘RecPtr’)(2 - 4)

上一篇: 文章目录 1. 打开 WAL Segment2. Standby mode 由一个 状态机(state machine)实现2.1 何处获取 WAL 文件?2.1.1 XLogSource2.1.2 从所选源(XLogSource )读取 XLOG2.1.2.1 walreceiver 运行状态 ?2.1.3 readFile(XLOG 文件句柄)1. 打开 WAL Segment 在经过前…...

Mysql基础

什么是关系型数据库&#xff1f; 顾名思义&#xff0c;关系型数据库&#xff08;RDB&#xff0c;Relational Database&#xff09;就是一种建立在关系模型的基础上的数据库。关系模型表明了数据库中所存储的数据之间的联系&#xff08;一对一、一对多、多对多&#xff09;。 …...

Altium Designer学习笔记 24 PCB初始布局2

基于Altium Designer 23学习版&#xff0c;四层板智能小车PCB 更多AD学习笔记&#xff1a;Altium Designer学习笔记 1-5 工程创建_元件库创建Altium Designer学习笔记 6-10 异性元件库创建_原理图绘制Altium Designer学习笔记 11-15 原理图的封装 编译 检查 _PCB封装库的创建Al…...

【从0学英语】形容词性/名词性物主代词是什么?

在英语中&#xff0c;物主代词是非常重要的语法概念之一&#xff0c;特别是对于初学者来说。理解形容词性物主代词和名词性物主代词的不同&#xff0c;能够帮助我们在日常对话中准确地表达拥有关系。在这篇文章中&#xff0c;我们将深入探讨这两个概念&#xff0c;并通过详细的…...

hhdb数据库介绍(10-29)

管理 数据备份 从存储节点或灾备机房数据备份 选择灾备机房类型、从库&#xff08;双主备库&#xff09;存储节点类型进行备份&#xff0c;页面根据选择类型&#xff0c;对应给出提示信息。发起备份时&#xff0c;检测从存储节点状态是否符合备份条件。 主从数据一致性检测…...

springboot(20)(删除文章分类。获取、更新、删除文章详细)(Validation分组校验)

目录 一、删除文章分类功能。 &#xff08;1&#xff09;接口文档。 1、请求路径、请求参数。 2、请求参数。 3、响应数据。 &#xff08;2&#xff09;实现思路与代码书写。 1、controller层。 2、service接口业务层。 3、serviceImpl实现类。 4、mapper层。 5、后端接口测试。…...

实战指南:理解 ThreadLocal 原理并用于Java 多线程上下文管理

目录 一、ThreadLocal基本知识回顾分析 &#xff08;一&#xff09;ThreadLocal原理 &#xff08;二&#xff09;既然ThreadLocalMap的key是弱引用&#xff0c;GC之后key是否为null&#xff1f; &#xff08;三&#xff09;ThreadLocal中的内存泄漏问题及JDK处理方法 &…...

Spark 内存管理机制

Spark 内存管理 堆内内存和堆外内存 作为一个 JVM 进程&#xff0c;Executor 的内存管理建立在 JVM(最小为六十四分之一&#xff0c;最大为四分之一)的内存管理之上&#xff0c;此外spark还引入了堆外内存&#xff08;不在JVM中的内存&#xff09;&#xff0c;在spark中是指不…...

【Maven】继承和聚合

5. Maven的继承和聚合 5.1 什么是继承 Maven 的依赖传递机制可以一定程度上简化 POM 的配置&#xff0c;但这仅限于存在依赖关系的项目或模块中。当一个项目的多个模块都依赖于相同 jar 包的相同版本&#xff0c;且这些模块之间不存在依赖关系&#xff0c;这就导致同一个依赖…...

NViST运行笔记

文章标题&#xff1a; NViST: In the Wild New View Synthesis from a Single Image with Transformers 1. 环境配置 创建环境 conda create -n nvist python3.9 进入环境 conda activate nvist 安装torch torchvision torchaudio pip install torch2.1.2 torchvision0…...

性能测试工具Grafana、InfluxDB和Collectd的搭建

一、性能监控组成简介 1、监控能力分工:这个系统组合能够覆盖从数据采集、存储到可视化的整个监控流程。Collectd可以收集各种系统和应用的性能指标,InfluxDB提供高效的时序数据存储,而 Grafana 则将这些数据以直观的方式呈现出来。2,实时性能监控:对于需要实时了解系统状…...

JS中的类与对象

面向对象是使用最广泛的一种编程范式&#xff0c;最具代表性的面向对象语言就是Java和C&#xff0c;在它们的理念中&#xff0c;面向对象的三大特性&#xff1a;封装&#xff0c;继承&#xff0c;多态。类&#xff0c;对象&#xff0c;公有/私有方法/属性&#xff0c;各种继承就…...

域名解析系统 DNS

1.域名系统概述 用户与互联网上某台主机通信时&#xff0c;必须要知道对方的IP地址。然而用户很难记住长达32 位的二进制主机地址。即使是点分十进制地址也并不太容易记忆。但在应用层为了便于用户记忆各种网络应用&#xff0c;连接在互联网上的主机不仅有P地址&#xff0c;而…...

Flutter 1.1:下载Flutter环境

1、在AS中下载Flutter插件 在setting的Plugins中下载Flutter&#xff0c;如图所示&#xff0c;可以直接进行搜索查找 2、下载flutter的sdk源代码 flutter中文文档学习 通过Git下载SDK源代码 git clone -b stable https://github.com/flutter/flutter.git3、配置系统变量 3…...

HTML5系列(6)-- 拖放 API 实战指南

前端技术探索系列&#xff1a;HTML5 拖放 API 实战指南 &#x1f3af; 致读者&#xff1a;探索现代交互技术 &#x1f44b; 前端开发者们&#xff0c; 今天我们将深入探讨 HTML5 中一个强大而实用的特性 —— 拖放 API。这项技术能够让我们创建更加直观和交互性强的用户界面…...

windows下kafka初体验简易demo

这里提供了windows下的java1.8和kafka3.9.0版本汇总&#xff0c;可直接免费下载 【免费】java1.8kafka2.13版本汇总资源-CSDN文库 解压后可以得到一个文件夹 资料汇总内有一个kafka文件资料包.tgz&#xff0c;解压后可得到下述文件夹kafka_2.13-3.9.0&#xff0c;资料汇总内还…...

算法训练(leetcode)二刷第三十三天 | *322. 零钱兑换、*279. 完全平方数、*139. 单词拆分

刷题记录 *322. 零钱兑换*279. 完全平方数*139. 单词拆分 *322. 零钱兑换 leetcode题目地址 dp[j]存储amount为j时所需要的最少硬币数。当j为0时需要0个硬币&#xff0c;因此dp[0]赋值为0. 因为是取最少硬币数&#xff0c;因此初始化需要赋值一个最大值。 状态转移方程&…...

windows的pip镜像源配置

Windows 中 pip 镜像源配置 在 Windows 系统中&#xff0c;为了提高 pip 包的安装速度&#xff0c;我们可以配置 pip 的镜像源。以下是具体的配置步骤&#xff1a; 创建文件夹 在 C:\Users\Administrator\pip 路径下创建一个名为 pip.ini 的文件。 编辑 pip.ini 文件 使用文本…...

Django Rest Framework中嵌套关系的JSON序列化

在 Django Rest Framework (DRF) 中&#xff0c;处理嵌套关系的 JSON 序列化是一个常见需求。以下是如何实现嵌套关系序列化的详细说明&#xff0c;包括序列化器定义、模型关系以及常见用法。 1、问题背景 假设我们有以下两个模型&#xff1a; class Jobdtl(models.Model):jo…...

ONVIF协议网络摄像机客户端使用gsoap获取RTSP流地址GStreamer拉流播放

什么是ONVIF协议 ONVIF&#xff08;开放式网络视频接口论坛&#xff09;是一个全球性的开放式行业论坛&#xff0c;旨在促进开发和使用基于物理IP的安全产品接口的全球开放标准。 ONVIF规范的目标是建立一个网络视频框架协议&#xff0c;使不同厂商生产的网络视频产品完全互通。…...

40分钟学 Go 语言高并发:Go程序性能优化方法论

Go程序性能优化方法论 一、性能指标概述 指标类型关键指标重要程度优化目标CPU相关CPU使用率、线程数、上下文切换⭐⭐⭐⭐⭐降低CPU使用率&#xff0c;减少上下文切换内存相关内存使用量、GC频率、对象分配⭐⭐⭐⭐⭐减少内存分配&#xff0c;优化GC延迟指标响应时间、处理延…...

MySQL基础(语句)知识复习 (除索引和视图)

1.客户端和数据库操作 1.登录客户端界面&#xff1a;mysql -uroot -p 2.查看当前的数据库版本&#xff1a;select version(); 3.显示所有数据库&#xff1a;show databases;&#xff0c; 4.创建数据库&#xff1a;create [IF NOT EXISTS] database 库名 character set 字符…...

【sqlcipher】pc端sqflite使用过程中遇到的问题

在flutter中使用sqlcipher时 Mac上如果通过flutter带的文件管理api&#xff08;即File的delete()方法&#xff09;删除数据库文件&#xff0c;再创建同名的数据文件的话&#xff0c;必现readonly问题&#xff0c; 这里需要注意的一点是 DatabaseFactory 在Mac上直接使用全局的…...

Vue 实现无线滚动效果

目录 1.Element-plus官网中的Infinite Scroll组件说明 2.滚动条设置 3.滚动到底部的函数调用 1.Element-plus官网中的Infinite Scroll组件说明 官网链接如下所示&#xff1a; Infinite Scroll 无限滚动 | Element Plus 首先查看该代码&#xff0c;发现这个组件使用了一个…...

【CSS in Depth 2 精译_062】第 10 章 CSS 中的容器查询(@container)概述 + 10.1 容器查询的一个简单示例

当前内容所在位置&#xff08;可进入专栏查看其他译好的章节内容&#xff09; 【第十章 CSS 容器查询】 ✔️ 10.1 容器查询的一个简单示例 ✔️ 10.1.1 容器尺寸查询的用法 ✔️ 10.2 深入理解容器10.3 与容器相关的单位10.4 容器样式查询的用法10.5 本章小结 文章目录 第 10…...

conda手动初始化

问题:环境中存在conda但是conda无法使用 方法: 进入到anaconda目录下, 进入bin目录, 然后执行 source activate要想启动时自动进入conda环境, 需要在 ~/.bashrc中添加如下命令 # >>> conda initialize >>> # !! Contents within this block are managed by …...

hhdb数据库介绍(10-28)

管理 管理菜单主要囊括对业务数据进行管理的功能&#xff0c;例如对数据的备份恢复或执行业务表的DDL语句等操作。 数据对象 数据对象功能可以帮助用户通过列表实时查看当前已存在的数据对象&#xff0c;了解业务数据的整体情况。提供了对数据对象的筛选、统计、关联、详情等…...

Spring Boot自定义启动banner

在启动 Springboot 应用时&#xff0c;默认情况下会在控制台打印出 Springboot 相关的banner信息。 自定义banner 如果你想自定义一个独特的启动banner&#xff0c;该怎么做呢&#xff1f;Springboot 允许我们通过自定义启动banner来替换默认的banner。只需要在 resources 目…...

c语言——数组名该如何理解呢?

一般情况下&#xff0c;数组名表示首元素地址&#xff0c;以下2种除外&#xff1a; ①、sizeof(数组名) 表示整个数组 ※只有数组名的情况 sizeof&#xff08;数组名i&#xff09; 就不能表示整个数组 ②、&数组名 表示整个数组&#xff0c;取的是整个数…...

前端 如何用 div 标签实现 步骤审批

在前端实现一个步骤审批流程&#xff0c;通常是通过 div 标签和 CSS 来构建一个可视化的流程图&#xff0c;结合 JavaScript 控制审批的状态变化。你可以使用 div 标签创建每一个步骤节点&#xff0c;通过不同的样式&#xff08;如颜色、边框等&#xff09;表示审批的不同状态&…...

QT工程,它该怎么学?

在现代软件开发中&#xff0c;QT因其强大的跨平台能力和友好的用户界面设计工具&#xff0c;成为开发者学习和应用的热门选择。特别是在Linux系统下&#xff0c;如何安装、配置QT开发环境&#xff0c;以及创建和管理QT工程是入门QT开发的关键环节。本文将从安装QT开发环境开始&…...

第426场周赛:仅含置位位的最小整数、识别数组中的最大异常值、连接两棵树后最大目标节点数目 Ⅰ、连接两棵树后最大目标节点数目 Ⅱ

Q1、仅含置位位的最小整数 1、题目描述 给你一个正整数 n。 返回 大于等于 n 且二进制表示仅包含 置位 位的 最小 整数 x 。 置位 位指的是二进制表示中值为 1 的位。 2、解题思路 我们需要找到一个整数 x&#xff0c;使得&#xff1a; x ≥ nx 的二进制表示中仅包含置位…...

23种设计模式之外观模式

目录 1. 简介2. 代码2.1 SelectFoodService (选择食品)2.2 PayService (支付服务)2.3 TakeService (制作服务)2.4 OrderService (下单服务)2.5 Food (食品)2.6 TackingSystem &#xff08;外观类&#xff09;2.7 Test &#xff08;测试类&#xff09; 3. 优缺点3. 总结 1. 简介…...

【智商检测——DP】

题目 代码 #include <bits/stdc.h> using namespace std; const int N 1e510, M 110; int f[N][M]; int main() {int n, k;cin >> n >> k;for(int i 1; i < n; i){int x;cin >> x;f[i][0] __gcd(f[i-1][0], x);for(int j 1; j < min(i, k)…...

LeetCode-430. 扁平化多级双向链表-题解

题目链接 430. 扁平化多级双向链表 - 力扣&#xff08;LeetCode&#xff09; 题目介绍 你将得到一个双链表&#xff0c;节点包含一个“下一个”指针、一个“前一个”指针和一个额外的“子指针”。这个子指针可能指向一个单独的双向链表&#xff0c;并且这些链表也包含类似的特殊…...

【CSS】一篇掌握CSS

不是因为有了希望才去坚持,而是坚持了才有了希望 目录 一.导入方式 1.行内样式 2.内部样式 3.外部样式(常用) 二.选择器 1.基本选择器(常用) 1.1标签选择器 1.2类选择器 1.3id选择器 2.层次选择器 2.1后代选择器 2.2子选择器 2.3相邻兄弟选择器 2.4通用兄弟选择器…...

华为仓颉编程环境搭建

1、仓颉介绍 摘自华为官方&#xff1a;仓颉编程语言作为一款面向全场景应用开发的现代编程语言&#xff0c;通过现代语言特性的集成、全方位的编译优化和运行时实现、以及开箱即用的 IDE 工具链支持&#xff0c;为开发者打造友好开发体验和卓越程序性能。 其具体特性表现为&am…...