预订接口优化:使用本地消息表保证订单生成、库存扣减的一致性
🎯 本文介绍了一种优化预订接口的方法,通过引入本地消息表解决分布式事务中的最终一致性问题。原先的实现是在一个事务中同时扣减库存和创建订单,容易因网络不稳定导致数据不一致。改进后的方法将业务操作和消息发送封装在本地事务中,并利用MQ进行异步解耦,确保了即使在网络故障时也能保证系统的数据一致性。此外,还设计了定时任务重试机制以及幂等性保障措施来进一步确保消息被成功处理,从而实现了高效可靠的分布式事务处理。
说明
在前面的预订实现中,是先开启一个事务,然后去扣减库存,再通过RPC调用订单服务来创建订单,如果订单创建成功,就提交事务;否则回滚事务。代码实现如下:
/*** 执行下单和数据库库存扣减操作** @param timePeriodDO* @param courtIndex* @param venueId* @return*/
@Override
public OrderDO executePreserveV1(TimePeriodDO timePeriodDO,Long courtIndex, Long venueId,String stockKey, String freeIndexBitMapKey) {// 编程式开启事务,减少事务粒度,避免长事务的发生return transactionTemplate.execute(status -> {try {// 扣减当前时间段的库存,修改空闲场信息baseMapper.updateStockAndBookedSlots(timePeriodDO.getId(), timePeriodDO.getPartitionId(), courtIndex);// 调用远程服务创建订单OrderGenerateReqDTO orderGenerateReqDTO = OrderGenerateReqDTO.builder().timePeriodId(timePeriodDO.getId()).partitionId(timePeriodDO.getPartitionId()).periodDate(timePeriodDO.getPeriodDate()).beginTime(timePeriodDO.getBeginTime()).endTime(timePeriodDO.getEndTime()).courtIndex(courtIndex).userId(UserContext.getUserId()).userName(UserContext.getUsername()).venueId(venueId).payAmount(timePeriodDO.getPrice()).build();Result<OrderDO> result;try {result = orderFeignService.generateOrder(orderGenerateReqDTO);if (result == null || !result.isSuccess()) {// --if-- 订单生成失败,抛出异常,上面的库存扣减也会回退throw new ServiceException(BaseErrorCode.ORDER_GENERATE_ERROR);}} catch (Exception e) {// --if-- 订单生成服务调用失败// 恢复缓存中的信息this.restoreStockAndBookedSlotsCache(timePeriodDO.getId(),UserContext.getUserId(),courtIndex,stockKey,freeIndexBitMapKey);// todo 如果说由于网络原因,实际上订单已经创建成功了,但是因为超时访问失败,这里库存却回滚了,此时需要将订单置为废弃状态(即删除)// 发送一个短暂的延时消息(时间过长,用户可能已经支付),去检查订单是否生成,如果生成,将其删除// 打印错误堆栈信息e.printStackTrace();// 把错误返回到前端throw new ServiceException(e.getMessage());}return result.getData();} catch (Exception ex) {status.setRollbackOnly();throw ex;}});
}
但是网络有时候是不稳定的,假如订单服务创建订单成功,但是由于网络原因,没办法将订单数据返回给库存服务。这时候库存服务就会误认为订单服务出错,进而回滚了事务。这样,就出现了订单创建成功,但是库存却没有扣减,出现了不一致问题,这种不一致会导致超卖。
由于库存扣减、订单生成处于不同的服务中,双方无法使用本地事务来保证两者的一致性,这属于分布式事务。常见的分布式事务解决方案有:
- 强一致:2PC、3PC、TCC、Saga模式
- 最终一致:本地消息表、MQ事务消息、最大努力通知
- 工具:Seata
本文使用比较常用的本地消息表来解决
本地消息表介绍
本地消息表的核心思想:将分布式事务拆分为本地事务+异步消息,通过本地事务保证消息的可靠存储,通过重试机制确保远程业务最终执行成功。
核心步骤
- 本地事务与消息写入 业务执行时,先在本地数据库完成业务操作,同时将待发送的消息(含业务ID、状态等)插入同一事务的
消息表
,利用本地事务的ACID特性保证两者原子性。 - 异步轮询消息 后台定时任务扫描
消息表
中状态为"待发送"的消息,调用下游服务的接口。 - 下游服务处理 下游服务执行业务逻辑,成功后返回确认;若失败或超时,触发重试(需保证接口幂等性)。
- 消息状态更新 下游处理成功后,更新本地
消息表
中该消息状态为"已完成";若多次重试失败则标记为"失败",人工介入处理。
关键点
- 可靠性:消息表与业务数据同库,本地事务确保业务执行成功,本地消息就会记录成功
- 异步解耦:通过异步重试替代同步阻塞,提高系统吞吐量
- 幂等性:下游服务调用要支持幂等性,不然重复消费可能出问题
本文实践过程
- 预订接口首先通过缓存验证用户是否预订成功,预订成功就发送一条预订消息到MQ
- 订单服务去消费预订消息,通过本地事务保证插入订单、插入本地消息的原子性
- 通过定时任务轮询本地消息表中还没有执行成功的消息,将任务投递到MQ中,后面让库存服务去消费,进行库存扣减(当然这里也可以直接通过RPC调用库存服务扣减,但是为了解耦两个服务,本文使用MQ来实现)
- 注意:库存服务执行库存扣减的时候,需要保证幂等性。即一个订单扣减过库存之后,不允许再扣减第二次。
数据库设计
首先需要创建一个表,用来记录本地消息
CREATE TABLE `local_message` (`id` bigint NOT NULL COMMENT '主键ID',`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',`is_deleted` tinyint NOT NULL DEFAULT '0' COMMENT '逻辑删除 0:未删除 1:已删除',`msg_id` varchar(64) NOT NULL COMMENT '唯一消息ID',`topic` varchar(200) NOT NULL COMMENT '消息Topic',`tag` varchar(200) NOT NULL DEFAULT '' COMMENT '消息Tag',`content` text NOT NULL COMMENT '消息内容(JSON格式)',`status` tinyint NOT NULL DEFAULT '0' COMMENT '消息状态 0:待发送 1:消费失败 2:消费成功 3:超过重试次数',`fail_reason` varchar(1000) DEFAULT NULL COMMENT '失败原因',`retry_count` int NOT NULL DEFAULT '0' COMMENT '已重试次数',`next_retry_time` bigint NOT NULL DEFAULT '0' COMMENT '下次重试时间戳(毫秒)',`max_retry_count` int NOT NULL DEFAULT '3' COMMENT '最大重试次数',PRIMARY KEY (`id`),UNIQUE KEY `uk_msg_id` (`msg_id`),KEY `idx_status_retry` (`status`, `next_retry_time`),KEY `idx_topic_tag` (`topic`, `tag`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='本地消息表';
业务实现
枚举
package com.vrs.enums;import lombok.Getter;
import lombok.RequiredArgsConstructor;/*** 场馆类型枚举*/
@RequiredArgsConstructor
public enum LocalMessageStatusEnum {INIT(0, "待发送"),SEND_FAIL(1, "消费失败"),SEND_SUCCESS(2, "消费成功"),ARRIVE_MAX_RETRY_COUNT(3, "超过重试次数"),;@Getterprivate final int status;@Getterprivate final String msg;}
预订
首先验证令牌是否充足,充足就发送一条预订消息到 MQ
/*** 尝试获取令牌,令牌获取成功之后,发送消息,异步执行库存扣减和订单生成* 注意:令牌在极端情况下,如扣减令牌之后,服务宕机了,此时令牌的库存是小于真实库存的* 如果查询令牌发现库存为0,尝试去数据库中加载数据,加载之后库存还是0,说明时间段确实售罄了* 使用消息队列异步 扣减库存,更新缓存,生成订单** @param timePeriodId* @param courtIndex*/
@Override
public String reserve2(Long timePeriodId, Integer courtIndex) { 参数校验:使用责任链模式校验数据是否正确TimePeriodReserveReqDTO timePeriodReserveReqDTO = new TimePeriodReserveReqDTO(timePeriodId, courtIndex);chainContext.handler(ChainConstant.RESERVE_CHAIN_NAME, timePeriodReserveReqDTO);Long venueId = timePeriodReserveReqDTO.getVenueId();VenueDO venueDO = timePeriodReserveReqDTO.getVenueDO();PartitionDO partitionDO = timePeriodReserveReqDTO.getPartitionDO();TimePeriodDO timePeriodDO = timePeriodReserveReqDTO.getTimePeriodDO(); 使用lua脚本获取一个空场地对应的索引,并扣除相应的库存,同时在里面进行用户的查重// 首先检测空闲场号缓存有没有加载好,没有的话进行加载this.checkBitMapCache(String.format(RedisCacheConstant.VENUE_TIME_PERIOD_FREE_INDEX_BIT_MAP_TOKEN_KEY, timePeriodReserveReqDTO.getTimePeriodId()),timePeriodId,partitionDO.getNum());// 其次检测时间段库存有没有加载好,没有的话进行加载this.getStockByTimePeriodId(RedisCacheConstant.VENUE_TIME_PERIOD_STOCK_TOKEN_KEY, timePeriodReserveReqDTO.getTimePeriodId());// todo 判断是否还有令牌,没有的话,重新加载(注意要分布式锁)// 执行lua脚本Long freeCourtIndex = executeStockReduceByLua(timePeriodReserveReqDTO,venueDO,courtIndex, RedisCacheConstant.VENUE_TIME_PERIOD_STOCK_TOKEN_KEY,RedisCacheConstant.VENUE_TIME_PERIOD_FREE_INDEX_BIT_MAP_TOKEN_KEY);if (freeCourtIndex == -2L) {// --if-- 用户已经购买过该时间段throw new ClientException(BaseErrorCode.TIME_PERIOD_HAVE_BOUGHT_ERROR);} else if (freeCourtIndex == -1L) {// --if-- 没有空闲的场号,查询数据库,如果数据库中有库存,删除缓存,下一个用户预定时重新加载令牌this.refreshTokenByCheckDataBase(timePeriodId);throw new ServiceException(BaseErrorCode.TIME_PERIOD_SELL_OUT_ERROR);} 发送消息,异步更新库存并生成订单String orderSn = SnowflakeIdUtil.nextId() + String.valueOf(UserContext.getUserId() % 1000000);SendResult sendResult = executeReserveProducer.sendMessage(ExecuteReserveMqDTO.builder().orderSn(orderSn).timePeriodId(timePeriodId).courtIndex(freeCourtIndex).venueId(venueId).userId(UserContext.getUserId()).userName(UserContext.getUsername()).partitionId(partitionDO.getId()).price(timePeriodDO.getPrice()).periodDate(timePeriodDO.getPeriodDate()).beginTime(timePeriodDO.getBeginTime()).endTime(timePeriodDO.getEndTime()).build());if (!sendResult.getSendStatus().equals(SendStatus.SEND_OK)) {log.error("消息发送失败: " + sendResult.getSendStatus());// 恢复令牌缓存this.restoreStockAndBookedSlotsCache(timePeriodId,UserContext.getUserId(),freeCourtIndex,RedisCacheConstant.VENUE_TIME_PERIOD_STOCK_TOKEN_KEY,RedisCacheConstant.VENUE_TIME_PERIOD_FREE_INDEX_BIT_MAP_TOKEN_KEY);throw new ServiceException(BaseErrorCode.MQ_SEND_ERROR);}return orderSn;
}
订单生成
消息预订消息,执行订单创建,并插入本地事务
/*** 消费预订之后的消息* 生成订单、生成本地消息** @param message*/
@Override
public void generateOrder(ExecuteReserveMqDTO message) {OrderDO orderDO = OrderDO.builder()// 订单号使用雪花算法生成分布式ID,然后再拼接用户ID的后面六位.orderSn(message.getOrderSn()).orderTime(new Date()).venueId(message.getVenueId()).partitionId(message.getPartitionId()).courtIndex(message.getCourtIndex()).timePeriodId(message.getTimePeriodId()).periodDate(message.getPeriodDate()).beginTime(message.getBeginTime()).endTime(message.getEndTime()).userId(message.getUserId()).userName(message.getUserName()).payAmount(message.getPrice()).orderStatus(OrderStatusConstant.UN_PAID).build();TimePeriodStockReduceMqDTO timePeriodStockReduceMqDTO = TimePeriodStockReduceMqDTO.builder().orderSn(message.getOrderSn()).timePeriodId(message.getTimePeriodId()).partitionId(message.getPartitionId()).courtIndex(message.getCourtIndex()).build();LocalMessageDO stockReduceLocalMessageDO = LocalMessageDO.builder().msgId(message.getOrderSn()).topic(RocketMqConstant.VENUE_TOPIC).tag(RocketMqConstant.TIME_PERIOD_STOCK_REDUCE_TAG).content(JSON.toJSONString(timePeriodStockReduceMqDTO)).nextRetryTime(System.currentTimeMillis()).maxRetryCount(5).build();LocalMessageDO delayCloseLocalMessageD0 = LocalMessageDO.builder().msgId(SnowflakeIdUtil.nextIdStr()).topic(RocketMqConstant.ORDER_TOPIC).tag(RocketMqConstant.ORDER_DELAY_CLOSE_TAG).content(JSON.toJSONString(OrderDelayCloseMqDTO.builder().orderSn(orderDO.getOrderSn()).build())).nextRetryTime(System.currentTimeMillis()).maxRetryCount(5).build();// 使用编程式事务,保证订单创建、本地消息插入的一致性boolean success = transactionTemplate.execute(status -> {try {int insertCount = baseMapper.insert(orderDO);localMessageService.save(stockReduceLocalMessageDO);// 也保存一个本地消息,进行兜底。防止事务提交成功之后就宕机,延时消息没有发生成功localMessageService.save(delayCloseLocalMessageD0);return insertCount > 0;} catch (Exception ex) {status.setRollbackOnly();throw ex;}});if (success) {// 发送延时消息来关闭未支付的订单SendResult sendResult = orderDelayCloseProducer.sendMessage(OrderDelayCloseMqDTO.builder().orderSn(orderDO.getOrderSn()).build());if (sendResult.getSendStatus().equals(SendStatus.SEND_OK)) {// 延迟关单已经发生成功,后面扫描的时候,无需再处理LocalMessageDO localMessageDO = new LocalMessageDO();localMessageDO.setId(delayCloseLocalMessageD0.getId());localMessageDO.setStatus(LocalMessageStatusEnum.INIT.getStatus());localMessageService.updateById(localMessageDO);}// todo 如果出现宕机,可能出现宕机,但是 websocket 消息没有消息,所以前端还要实现一个轮询来保底// 通过 websocket 发送消息,通知前端websocketSendMessageProducer.sendMessage(WebsocketMqDTO.builder().toUsername(orderDO.getUserName()).message(JSON.toJSONString(orderDO)).build());}
}
定时任务
- 定期扫描本地消息表(
local_message
)中待处理(未处理、上次处理失败、下次重试时间小于等于现在)的消息 - 根据消息 Topic 和 tag 调用不同的消息处理器,将本地消息投递到消息队列中
- 消息投递成功后更新消息状态,失败则通过指数退避算法计算下次重试时间,等待下次重试
- 使用分布式锁保证集群环境下只有一个实例执行任务
【性能优化】
- 使用流式查询,避免分页查询的无效扫描
- 通过批量修改优化单条修改的效率
【策略模式】
- 通过策略模式,根据不同的 tag 获得不同的 MQ 生产者,避免
if else
代码
package com.vrs.service.scheduled;import com.alibaba.fastjson2.JSON;
import com.vrs.constant.RocketMqConstant;
import com.vrs.design_pattern.strategy.MessageProcessor;
import com.vrs.domain.dto.mq.OrderDelayCloseMqDTO;
import com.vrs.domain.dto.mq.TimePeriodStockReduceMqDTO;
import com.vrs.domain.entity.LocalMessageDO;
import com.vrs.enums.LocalMessageStatusEnum;
import com.vrs.rocketMq.producer.OrderDelayCloseProducer;
import com.vrs.rocketMq.producer.TimePeriodStockReduceProducer;
import com.vrs.service.LocalMessageService;
import jakarta.annotation.PostConstruct;
import lombok.Cleanup;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;/*** @Author dam* @create 2024/11/17 16:44*/
@Component
@RequiredArgsConstructor
@Slf4j
public class LocalMessageScheduledScan {private final DataSource dataSource;private final LocalMessageService localMessageService;private final TimePeriodStockReduceProducer timePeriodStockReduceProducer;private final OrderDelayCloseProducer orderDelayCloseProducer;private final RedissonClient redissonClient;/*** 使用策略模式处理消息*/// todo 可以优化策略模式的写法,方便代码扩展private final Map<String, MessageProcessor> messageProcessors = new HashMap<>();private final int BATCH_SIZE = 1000;/*** 注册 tag 和其对应的消息处理器*/@PostConstructpublic void init() {messageProcessors.put(RocketMqConstant.TIME_PERIOD_STOCK_REDUCE_TAG, mqDTO -> {TimePeriodStockReduceMqDTO dto = JSON.parseObject(mqDTO.getContent(), TimePeriodStockReduceMqDTO.class);return timePeriodStockReduceProducer.sendMessage(dto);});messageProcessors.put(RocketMqConstant.ORDER_DELAY_CLOSE_TAG, mqDTO -> {OrderDelayCloseMqDTO dto = JSON.parseObject(mqDTO.getContent(), OrderDelayCloseMqDTO.class);return orderDelayCloseProducer.sendMessage(dto);});}/*** 定时任务:扫描并处理本地消息* 每分钟执行一次*/@Scheduled(cron = "0 */1 * * * ?")@SneakyThrowspublic void processLocalMessage() {RLock lock = redissonClient.getLock("LocalMessageScan");boolean locked = false;try {locked = lock.tryLock(1, TimeUnit.MINUTES);if (!locked) {log.warn("获取分布式锁失败,跳过本次处理");return;}log.info("开始扫描本地消息表...");long start = System.currentTimeMillis();@Cleanup Connection conn = dataSource.getConnection();@Cleanup Statement stmt = conn.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);stmt.setFetchSize(Integer.MIN_VALUE);// 查询sql,只查询关键的字段String sql = "SELECT id,msg_id,topic,tag,content,retry_count,max_retry_count,next_retry_time FROM local_message where " +"is_deleted = 0 and (status = 0 OR status = 1) and next_retry_time<" + start;@Cleanup ResultSet rs = stmt.executeQuery(sql);List<LocalMessageDO> localMessageBuffer = new ArrayList<>();while (rs.next()) {// 获取数据中的属性LocalMessageDO localMessageDO = new LocalMessageDO();localMessageDO.setId(rs.getLong("id"));localMessageDO.setMsgId(rs.getString("msg_id"));localMessageDO.setTopic(rs.getString("topic"));localMessageDO.setTag(rs.getString("tag"));localMessageDO.setContent(rs.getString("content"));localMessageDO.setRetryCount(rs.getInt("retry_count"));localMessageDO.setMaxRetryCount(rs.getInt("max_retry_count"));localMessageDO.setNextRetryTime(rs.getLong("next_retry_time"));if (localMessageDO.getRetryCount() > localMessageDO.getMaxRetryCount()) continue;localMessageBuffer.add(localMessageDO);if (localMessageBuffer.size() > BATCH_SIZE) {batchProcessMessages(localMessageBuffer);localMessageBuffer.clear();}}if (!localMessageBuffer.isEmpty()) {batchProcessMessages(localMessageBuffer);}log.info("结束扫描本地消息表..." + (System.currentTimeMillis() - start) + "ms");} catch (Exception e) {log.error("处理本地消息表时发生异常", e);throw e; // 或根据业务决定是否抛出} finally {if (locked && lock.isHeldByCurrentThread()) {lock.unlock();}}}/*** 批量处理消息*/private void batchProcessMessages(List<LocalMessageDO> messages) {// 成功和失败的消息分开处理List<Long> successIds = new ArrayList<>();List<Long> retryIds = new ArrayList<>();List<Long> arriveMaxRetryCountIds = new ArrayList<>();Map<Long, String> failureReasons = new HashMap<>();for (LocalMessageDO message : messages) {try {if (message.getRetryCount() > message.getMaxRetryCount()) {// 已经到达最大重试次数arriveMaxRetryCountIds.add(message.getId());continue;}MessageProcessor processor = messageProcessors.get(message.getTag());SendResult sendResult = processor.process(message);if (sendResult.getSendStatus().equals(SendStatus.SEND_OK)) {successIds.add(message.getId());} else {retryIds.add(message.getId());failureReasons.put(message.getId(), "MQ发送状态: " + sendResult.getSendStatus());}} catch (Exception e) {log.error("处理消息 {} 时发生异常", message.getMsgId(), e);retryIds.add(message.getId());failureReasons.put(message.getId(), "处理异常: " + e.getMessage());}}// 批量更新状态if (!successIds.isEmpty()) {batchUpdateMessagesStatus(successIds, LocalMessageStatusEnum.SEND_SUCCESS);}if (!arriveMaxRetryCountIds.isEmpty()) {// todo 通知人工处理batchUpdateMessagesStatus(arriveMaxRetryCountIds, LocalMessageStatusEnum.ARRIVE_MAX_RETRY_COUNT);}if (!retryIds.isEmpty()) {batchUpdateRetryMessages(retryIds, failureReasons);}}/*** 批量更新消息状态*/private void batchUpdateMessagesStatus(List<Long> ids, LocalMessageStatusEnum status) {if (ids.isEmpty()) return;List<LocalMessageDO> updates = ids.stream().map(id -> {LocalMessageDO update = new LocalMessageDO();update.setId(id);update.setStatus(status.getStatus());if (status == LocalMessageStatusEnum.SEND_FAIL) {update.setRetryCount(localMessageService.getById(id).getMaxRetryCount());}return update;}).collect(Collectors.toList());if (updates.size() > 0) {localMessageService.updateBatchById(updates);}}/*** 批量更新重试消息*/private void batchUpdateRetryMessages(List<Long> ids, Map<Long, String> failReasons) {if (ids.isEmpty()) return;List<LocalMessageDO> messages = localMessageService.listByIds(ids);List<LocalMessageDO> updates = messages.stream().map(message -> {LocalMessageDO update = new LocalMessageDO();update.setId(message.getId());update.setStatus(LocalMessageStatusEnum.SEND_FAIL.getStatus());update.setRetryCount(message.getRetryCount() + 1);update.setNextRetryTime(getNextRetryTime(message.getRetryCount() + 1));update.setFailReason(failReasons.get(message.getId()));return update;}).collect(Collectors.toList());if (updates.size() > 0) {localMessageService.updateBatchById(updates);}}/*** 获取下次重试时间** @param retryCount* @return*/private long getNextRetryTime(int retryCount) {long interval = (long) Math.min(Math.pow(2, retryCount) * 1000, 3600 * 1000);return System.currentTimeMillis() + interval;}
}
库存扣减
注意库存扣减需要通过幂等组件来保证消费幂等性,key 是订单号,即保证同一个订单号只能扣减库存一次
package com.vrs.rocketMq.listener;import com.vrs.annotation.Idempotent;
import com.vrs.constant.RocketMqConstant;
import com.vrs.domain.dto.mq.TimePeriodStockReduceMqDTO;
import com.vrs.enums.IdempotentSceneEnum;
import com.vrs.service.TimePeriodService;
import com.vrs.templateMethod.MessageWrapper;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.annotation.SelectorType;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;/*** @Author dam* @create 2024/9/20 21:30*/
@Slf4j(topic = RocketMqConstant.VENUE_TOPIC)
@Component
@RocketMQMessageListener(topic = RocketMqConstant.VENUE_TOPIC,consumerGroup = RocketMqConstant.VENUE_CONSUMER_GROUP + "-" + RocketMqConstant.TIME_PERIOD_STOCK_REDUCE_TAG,messageModel = MessageModel.CLUSTERING,// 监听tagselectorType = SelectorType.TAG,selectorExpression = RocketMqConstant.TIME_PERIOD_STOCK_REDUCE_TAG
)
@RequiredArgsConstructor
public class TimePeriodStockReduceListener implements RocketMQListener<MessageWrapper<TimePeriodStockReduceMqDTO>> {private final TimePeriodService timePeriodService;/*** 消费消息的方法* 方法报错就会拒收消息** @param messageWrapper 消息内容,类型和上面的泛型一致。如果泛型指定了固定的类型,消息体就是我们的参数*/@Idempotent(uniqueKeyPrefix = "time_period_stock_reduce:",key = "#messageWrapper.getMessage().getOrderSn()+''",scene = IdempotentSceneEnum.MQ,keyTimeout = 3600L)@SneakyThrows@Overridepublic void onMessage(MessageWrapper<TimePeriodStockReduceMqDTO> messageWrapper) {// 开头打印日志,平常可 Debug 看任务参数,线上可报平安(比如消息是否消费,重新投递时获取参数等)log.info("[消费者] 更新时间段的库存和空闲场号,时间段ID:{}", messageWrapper.getMessage().getTimePeriodId());timePeriodService.reduceStockAndBookedSlots(messageWrapper.getMessage());}
}
【service】
/*** 扣减库存** @param timePeriodStockReduceMqDTO*/
@Override
public void reduceStockAndBookedSlots(TimePeriodStockReduceMqDTO timePeriodStockReduceMqDTO) {baseMapper.updateStockAndBookedSlots(timePeriodStockReduceMqDTO.getTimePeriodId(), timePeriodStockReduceMqDTO.getPartitionId(), timePeriodStockReduceMqDTO.getCourtIndex());
}
【mapper】
<update id="updateStockAndBookedSlots"><![CDATA[UPDATE time_periodSET booked_slots = booked_slots | (1 << #{partitionIndex}), stock = stock - 1WHERE id = #{timePeriodId} AND stock > 0 AND partition_id = #{partitionId}]]>
</update>
相关文章:
预订接口优化:使用本地消息表保证订单生成、库存扣减的一致性
🎯 本文介绍了一种优化预订接口的方法,通过引入本地消息表解决分布式事务中的最终一致性问题。原先的实现是在一个事务中同时扣减库存和创建订单,容易因网络不稳定导致数据不一致。改进后的方法将业务操作和消息发送封装在本地事务中…...
深度学习与 PyTorch 基础
笔记 1 深度学习简介 1.1 深度学习概念 深度学习是机器学习的一类算法, 以人工神经网络为结构, 可以实现自动提取特征 深度学习核心思想是人工神经网络为结构, 自动提取特征 1.2 深度学习特点 自动提取特征 解释性差 大量数据和高性能计算能力 非线性转换(引入非线性因…...
libevent库详解:高性能异步IO的利器
目录 一、libevent 简介 主要特点: 二、事件模型原理 1. event_base 2. event 3. evconnlistener(TCP监听器) 4. bufferevent 简化流程如下: 三、libevent 使用示例 1. 创建事件主循环 2. 创建监听器(TCP&a…...
第一章:A Primer on Memory Consistency and Cache Coherence - 2nd Edition
引言: 许多现代计算机系统,包括同构和异构架构的系统,都在硬件层面支持共享内存。在共享内存系统中,每个处理器核心都可以对单一的共享地址空间进行读写操作。对于共享内存计算机而言,内存一致性模型定义了其内存系统在…...
NVIDIA Omniverse在数字孪生中的算力消耗模型构建方法
引言:虚拟实验室的算力经济学 在高校虚拟实验室建设中,数字孪生系统的实时物理仿真精度与算力成本之间存在显著矛盾。以H800 GPU集群为例,单个8卡节点每秒可处理2.3亿个物理粒子交互,但若未建立精准的算力消耗模型,资…...
C++ 动态内存管理详讲
1. 四个全局函数的定义与作用 这四个函数只负责空间的开辟和释放,不会调构造和析构 (1) ::operator new cpp void* operator new(size_t size); // 全局版本 功能:分配 size 字节的未初始化内存。 底层实现:调用 malloc(size)。 调用场…...
纹理对象创建
纹理对象通俗点就是贴图,像游戏的皮肤什么就是纹理。常间的结构就是激活纹理单元(0-15有16个),将纹理对象挂在纹理单元上,纹理采样器需要采哪个样品就与哪个单元挂钩就行了,加载纹理对象需要用到stb_image库…...
如何利用dify 生成Fine‑tune 需要的Alpaca 格式数据
如果你选择llamafactory 格式进行微调,它只是格式是Alpaca格式,dify 的agent dsl 如下,你可以导入本地的dify 或者导入cloud 版本的;测试版本是0.1.5 app:description: 上传文件,基于文件内容,使用 Silico…...
软件第三方测试:关键部分、意义、流程及方法全解析?
软件第三方测试是保障软件质量的关键部分,它由专业的机构来开展,这个机构不隶属于开发方和使用方,能以客观公正的视角找出软件问题。 测试意义 软件第三方测试意义重大,它依靠专业技术,依照严格流程,对软…...
贪心算法解决会议安排问题
文章目录 前言 一、什么是贪心算法? 贪心算法的基本概念:贪心算法并不从整体最优上加以考虑,所做的选择只是在某种意义上的局部最优选择。 二、会议安排题目 1.题目理解 2.思路剖析 总结 前言 本文将主要介绍贪心算法需要注意的地方以…...
高露洁牙膏是哪个国家的品牌?高露洁牙膏哪一款最好?
高露洁是来自于美国一个比较有知名度的品牌,在1806年的时候创立。总部是在美国纽约公园大道,在1873年时,高露洁就已经开始销售罐装牙膏。 在1896年时期推出可折叠管牙膏,在口腔护理产品发展的过程中拥有着不容忽视的地位。在1992…...
lin接口在线计算数据帧的校验位
在线校验计算链接:https://linchecksumcalculator.machsystems.cz/ 插入图片:...
Linux-07-Shell
一、Shell概述: Shell是一个命令行解释器,它接受应用程序/用户命令,然后调用操作系统内核 二、Shell中的变量: 1.系统预定义的变量: $HOME,$PWD,$SHELL,$USER等 2.用户自定义的变量: (1).基本语法: 定义变量:变量名变量值,注意前后不能…...
【云盘】使用阿里云盘托管项目大文件
【云盘】使用阿里云盘托管项目大文件 由于经常需要切换服务器运行项目实验,不同服务器在项目实验过程中会产生不同的数据、模型等较大文件,不能像代码那样能够使用git托管,因此考虑使用阿里云盘作为”第三方平台“托管这些大文件。 一、使用…...
《缓存策略:移动应用网络请求的“效能密钥” 》
用户体验无疑是重中之重,而网络请求性能,恰似一座桥梁,连接着用户与应用丰富的内容和功能。当网络不佳或者请求频繁时,缓慢的响应速度常常让用户兴致索然,甚至可能导致用户流失。此时,缓存策略就如同一位幕…...
深入解析C++11委托构造函数:消除冗余初始化的利器
一、传统构造函数的痛点 在C11之前,当多个构造函数需要执行相同的初始化逻辑时,开发者往往面临两难选择: class DataProcessor {std::string dataPath;bool verbose;int bufferSize; public:// 基础版本DataProcessor(const std::string&am…...
文章七《深度学习调优与超参数优化》
🚀 文章7:深度学习调优与超参数优化——你的AI模型需要一场"整容手术" 一、模型调优核心策略:像调整游戏装备一样优化模型 1. 学习率调整:掌控训练的"油门踏板" 比喻:把模型训练想象成赛车游戏&…...
python入门(1)变量与输入输出
一、变量 使用规则 变量名值例子 a13变量名规则 变量名可以用大小写字母、数字、下划线。 数字、下划线不可开头 例子 name name1 1name name_first _first 二、输入输出 输出print print(*objects,sep"",end"\n") objects:多个要输出的值 sep:每个…...
藏文情感分析器入门学习实践
🎯 项目目标: 输入一段藏文短句。自动分析这句话的情感倾向:积极(正面)/消极(负面)/中立。 🔍 技术原理简介 情感分析是什么? 情感分析(Sentiment Analysi…...
爱胜品ICSP YPS-1133DN Plus黑白激光打印机报“自动进纸盒进纸失败”处理方法之一
故障现象如下图提示: 用户的爱胜品ICSP YPS-1133DN Plus黑白激光打印机在工作过程中提示自动进纸盒进纸失败并且红色故障灯闪烁; 给出常见故障一般处理建议如下: 当您的爱胜品ICSP YPS-1133DN Plus 黑白激光打印机出现“自动进纸盒进纸失败”…...
数据库索引重建与优化操作在数据库性能维护与数据更新频繁场景下的应用
数据库索引重建与优化操作在数据库性能维护与数据更新频繁场景下的应用 数据库索引的作用与重要性 索引的定义与作用 数据库索引是一种特殊的数据结构,用于加快数据库表的数据检索速度。它类似于书籍的目录,能够快速定位到需要的数据页,而不必…...
前端应用开发技术历程的简要概览
前端应用开发技术详解 一、萌芽期(1990s - 2004) 技术特征 HTML 3.2 / HTML 4.01 是主流版本。 样式用 CSS1/CSS2,但大部分样式写在 <style> 标签甚至行内。 动态效果主要通过 JavaScript 控制 DOM,兼容性极差。 代表事…...
SPOJ 11576 TRIP2 - A Famous King’s Trip 【Tarjan+欧拉回路】
自我吐槽 (哭 题目传送门 SPOJ 洛谷 题目大意 让你在简单无向图上删去2条边,使该图联通并存在欧拉回路 输出字典序最小的一对边 思路 考虑到存在欧拉回路的充要条件,即 i n x ≡ 0 ( m o d 2 ) ∀ i ( 1 ≤ i ≤ n ) in_x\equiv 0 (\m…...
DeepSeek R1:强化学习范式的推理强化模型
定位与目标 DeepSeek R1 的推出并非 DeepSeek V3 的简单迭代,而是一次在训练范式上的大胆探索。与传统大模型主要依靠监督微调(SFT)后进行强化学习不同,R1 将重点放在推理能力和行为对齐上,尝试通过大规模强化学习直接激发模型的推理潜力。其目标是利用强化学习的反馈机制,…...
ubuntu22.04安装显卡驱动与cuda+cuDNN
背景: 紧接前文:Proxmox VE 8.4 显卡直通完整指南:NVIDIA 2080 Ti 实战。在R740服务器完成了proxmox的安装,并且安装了一张2080ti 魔改22g显存的的显卡。配置完了proxmox显卡直通,并将显卡挂载到了vm 301(…...
使用python爬取百度搜索中关于python相关的数据信息
Python爬取百度搜索"Python"相关数据信息 一、准备工作 在开始爬取之前,需要了解以下几点: 百度搜索有反爬机制,需要合理设置请求头百度搜索结果页面结构可能会变化需要遵守robots.txt协议(百度允许爬取搜索结果&…...
Bootstrap(自助法):无需假设分布的统计推断工具
核心思想 Bootstrap 是一种重采样(Resampling)技术,通过在原始数据中有放回地重复抽样,生成大量新样本集,用于估计统计量(如均值、方差)的分布或模型性能的不确定性。 …...
lib和dll介绍和VS2019生成实例
目录 lib文件和dll文件的作用dll和lib的优缺点VS2019 编译YOLOv5的dll和lib lib文件和dll文件的作用 (1)lib是编译时需要的,dll是运行时需要的。 如果要完成源代码的编译,有lib就够了。 如果也使动态连接的程序运行起来,有dll就够了。 在开发…...
tinycudann安装过程加ubuntu18.04gcc版本的升级(成功版!!!!)
使用的是 Linux,安装以下软件包 sudo apt-get install build-essential git安装 CUDA 并将 CUDA 安装添加到您的 PATH。 例如,如果您有 CUDA 12.6.3,请将以下内容添加到您的/usr/local/~/.bashrcexport PATH"/usr/local/cuda-12.6.3/bi…...
数字智慧方案5869丨智慧健康医疗养老大数据整体规划方案(76页PPT)(文末有下载方式)
资料解读:智慧健康医疗养老大数据整体规划方案 详细资料请看本解读文章的最后内容。 随着科技的飞速发展,健康医疗领域正经历着一场深刻的变革。特别是在大数据和人工智能技术的推动下,智慧健康医疗养老的整体规划方案逐渐浮出水面。本文将…...
使用huggingface_hub需要注意的事项
在安装huggingface_hub的时候要注意如果你的python是放在c盘下时记得用管理员模式命令行来安装huggingface_hub,否则安装过程会报错,之后也不会有huggingface-cli命令。 如果安装时因为没有用管理员权限安装而报错了,可以先卸载huggingface-…...
Matplotlib核心课程-2
4.1 数据加载、储存 4.1.1 从数据文件读取数据 导入支持库: import numpy as np from pandas import Series,DataFrame import pandas as pd 从csv文件读取数据,一般方法: pd.read_csv(../data/ex1.csv,encodinggbk) 从csv文件读取数据&#…...
友元函数和友元类
友元 友元是 C 提供的一种 打破封装 的机制,允许 友元函数 或 友元类 访问某个类的 非公有成员(private/protected)。 友元函数 友元函数 可以 直接访问 类的所有 成员,它是 定义在类外部 的 普通函数 ,不属于任何类…...
5.2刷题
P1064 [NOIP 2006 提高组] 金明的预算方案 背包+附属品DP #include<bits/stdc.h> using namespace std; #define int long long int n, m, v, p, q; struct node{int id, v, s, f; }a[100]; int b[32010], dp[32010]; bool cmp(node a, node b){if(a.id b.…...
用VNA进行天线阻抗匹配的实例大图
比如我这天线,在7Mhz时不谐振,我进行匹配 天线的阻抗很高,大约是在500-1400欧,而等效电容电感很小。 所以我考虑使用阻抗变压器降低阻抗。 1。测试天线阻抗,电阻相当高,等效电容很小。 2。通过磁环匹配到…...
普通IT的股票交易成长史--20250502 突破(1)
声明:本文章的内容只是自己学习的总结,不构成投资建议。文中观点基本来自yt站方方土priceaction,综合自己的观点得出。感谢他们的无私分享。 送给自己的话: 仓位就是生命,绝对不能满仓!!&#…...
[预备知识]5. 优化理论(一)
优化理论 梯度下降(Gradient Descent) 数学原理与可视化 梯度下降是优化领域的基石算法,其核心思想是沿负梯度方向迭代更新参数。数学表达式为: θ t 1 θ t − α ∇ θ J ( θ t ) \theta_{t1} \theta_t - \alpha \nabla…...
AI人工智能的接入和使用
缘起 从参加工作开始就在从事AI的落地和接入,到现在已经25年了。所以对AI一直有种情怀,还写了一系列的《基于语音识别的智能电子病历》的文章,记录了这条路上的潮起潮落。 年少多痴狂 2015年开始负责开发语音识别引擎语义分析,…...
QT6(32)4.5常用按钮组件:Button 例题的代码实现
(103) 先设置对齐: 再设置粗体、斜体、下划线: 给出这三个按钮的源码; 颜色按钮的实现 : 至此完结,谢谢老师们的无私教导。 (104) 谢谢...
B站Michale_ee——ESP32_IDF SDK——FreeRTOS_8 消息缓冲区
Message Buffer(消息缓冲区)与Stream Buffer(流数据缓冲区)类似,但有2点不同: Message Buffer每次只接收1次完整的Message;Message Buffer接收缓冲区小于1条Message大小时,会接收不到数据&#…...
DarkGS:论文解读与全流程环境配置及数据集测试【基于Ubuntu20.04 】【2025最新实战无坑版!!】
一、背景及意义 DarkGS是一个创新性的研究项目,旨在解决机器人在黑暗或低光照环境中探索的问题。传统的3D重建和视觉定位系统在光照条件不佳时表现不佳,这严重限制了机器人在黑暗环境中的应用,如夜间救援、深海探索或洞穴勘测等场景。 这项工…...
【大模型面试每日一题】Day 6:分布式训练中 loss 出现 NaN,可能原因及排查方法?
【大模型面试每日一题】Day 6:分布式训练中 loss 出现 NaN,可能原因及排查方法? 📌 题目重现 🌟🌟 面试官:你在使用 PyTorch 进行大规模语言模型的分布式训练时,发现 loss 变成 Na…...
[面试]SoC验证工程师面试常见问题(二)
SoC验证工程师面试常见问题(二) 摘要:面试SoC验证工程师时,SystemVerilog (SV) 和 UVM (Universal Verification Methodology) 是核心技能,而AXI总线是现代SoC中最常见的接口协议之一,因此也是必考点。以下是可能被问到的问题及优质答案的详细列表: 一、 System…...
BLE协议栈的解析
目录 概述 1 BLE协议栈层次结构 1.1 控制器(Controller) 1.2 主机(Host) 1.3 应用层(Application) 1.3.1 业务层功能 1.3.2 实现方法 2 重要属性介绍 2.1 GATT属性 2.2 服务(Service) 2.3 特征值…...
中小企业MES系统需求文档
适用对象:中小型离散制造企业(年产值1-5亿,员工200-800人) 版本:V1.0 日期:2025年5月2日 一、业务背景与目标 1.1 现状痛点 生产黑箱化:车间进度依赖人工汇报,异常响应延迟>2小…...
邹晓辉教授十余年前关于围棋程序与融智学的思考,体现了对复杂系统本质的深刻洞察,其观点在人工智能发展历程中具有前瞻性意义。我们可以从以下三个维度进行深入解析:
邹晓辉教授十余年前关于围棋程序与融智学的思考,体现了对复杂系统本质的深刻洞察,其观点在人工智能发展历程中具有前瞻性意义。我们可以从以下三个维度进行深入解析: 一、围棋程序的二元解构:数据结构与算法的辩证关系 1.1.形式…...
JAVA继承详细总结
看前摇一摇这篇文章:java 继承 补充:子类能继承父类中的哪些内容? - 小澳子 - 博客园 构造方法的继承规则 Java 中构造方法不会被子类继承。JLS 明确指出“构造方法不是类的成员,因此永远不会被继承”docs.oracle.com。博客原文在“继承内存…...
AntSK:基于大模型的一体化AI知识库解决方案深度解析
随着大模型(如GPT、LLM)技术的飞速发展,企业对智能知识管理和专属AI助手的需求日益增长。AntSK 正是在这一背景下诞生的企业级AI一体机解决方案。本文将从技术架构、核心功能、创新点和应用场景等方面,深入解析 AntSK 如何助力企业…...
C++11新特性_标准库_std::array
std::array 是 C11 标准库引入的一个容器,用于表示固定大小的数组。它定义在 <array> 头文件中。下面为你详细介绍其优势和使用方法。 优势 1. 类型安全 与传统的 C 风格数组不同,std::array 是一个模板类,它的类型信息在编译时就已…...
【AI面试准备】数据治理与GDPR脱敏机制构建
介绍数据治理:构建符合GDPR的测试数据脱敏机制。如何快速掌握,以及在实际工作中如何运用。 数据治理是确保数据质量、安全性和合规性的系统性方法,而构建符合GDPR(《通用数据保护条例》)的测试数据脱敏机制是其中的关…...