Spring Cloud Stream集成RocketMQ(kafka/rabbitMQ通用)
什么是Spring Cloud Stream
Spring Cloud Stream 是 Spring 生态系统中的一个框架,用于简化构建消息驱动微服务的开发和集成。它通过抽象化的方式将消息中间件(如 RabbitMQ、Kafka、RocketMQ 等)的复杂通信逻辑封装成简单的编程模型,使开发者能够专注于业务逻辑,而无需过多关注底层消息系统的实现细节。
详细解释
这里是官网代码中推出的解释:
Spring Cloud Stream 提供了消息中间件配置的统一抽象,推出了 publish-subscribe、consumer groups、partition 这些统一的概念。
Spring Cloud Stream 内部有两个概念:Binder 和 Binding。
Binder: 跟外部消息中间件集成的组件,用来创建 Binding,各消息中间件都有自己的 Binder 实现。
比如 Kafka 的实现 KafkaMessageChannelBinder,RabbitMQ 的实现 RabbitMessageChannelBinder 以及 RocketMQ 的实现 RocketMQMessageChannelBinder。
Binding: 包括 Input Binding 和 Output Binding。
Binding 在消息中间件与应用程序提供的 Provider 和 Consumer 之间提供了一个桥梁,实现了开发者只需使用应用程序的 Provider 或 Consumer 生产或消费数据即可,屏蔽了开发者与底层消息中间件的接触。
总结:
Binder:解决“用什么消息中间件”的问题(如 Kafka vs RabbitMQ)。
Binding:解决“消息从哪里来、到哪里去”的问题(如 Topic 名称、消费者组)。
协作关系:
下图是 Spring Cloud Stream 的架构设计。
+-------------------+ +-------------------+
| Application | | Application |
| (Microservice) | | (Microservice) |
+-------------------+ +-------------------+| || Output Binding (Producer) | Input Binding (Consumer)↓ ↓
+--------------------------------------------------+
| Binder (抽象层) |
| (Kafka/RabbitMQ/RocketMQ 的适配实现) |
+--------------------------------------------------+| |↓ ↓
+-------------------+ +-------------------+
| Message Broker | | Message Broker |
| (e.g., Kafka) | | (e.g., RabbitMQ) |
+-------------------+ +-------------------+
业务代码 → Binding(定义通道) → Binder(连接中间件) → 消息中间件
根据官网文档,集成了下面这些消息中间件或者流事件平台。这里用rokectMQ举例
使用说明
下载github中的rocketMQ代码
1.首先点开下面github中的RocketMQ示例代码
直接下载全部 直接看examples
下载代码,可以看见很多示例,下面以(orderly 顺序消息)说明
这里他直接在启动类中简单实现了生产数据和消费数据的代码
并且带有说明 见readme
代码说明(orderly顺序消费)
@SpringBootApplication
public class RocketMQOrderlyConsumeApplication {private static final Logger log = LoggerFactory.getLogger(RocketMQOrderlyConsumeApplication.class);@Autowiredprivate StreamBridge streamBridge;/**** tag array.*/public static final String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};public static void main(String[] args) {SpringApplication.run(RocketMQOrderlyConsumeApplication.class, args);}@Beanpublic ApplicationRunner producer() {return args -> {for (int i = 0; i < 100; i++) {String key = "KEY" + i;Map<String, Object> headers = new HashMap<>();headers.put(MessageConst.PROPERTY_KEYS, key);headers.put(MessageConst.PROPERTY_TAGS, tags[i % tags.length]);headers.put(MessageConst.PROPERTY_ORIGIN_MESSAGE_ID, i);Message<SimpleMsg> msg = new GenericMessage(new SimpleMsg("Hello RocketMQ " + i), headers);streamBridge.send("producer-out-0", msg);}};}@Beanpublic Consumer<Message<SimpleMsg>> consumer() {return msg -> {String tagHeaderKey = RocketMQMessageConverterSupport.toRocketHeaderKey(MessageConst.PROPERTY_TAGS).toString();log.info(Thread.currentThread().getName() + " Receive New Messages: " + msg.getPayload().getMsg() + " TAG:" +msg.getHeaders().get(tagHeaderKey).toString());try {Thread.sleep(100);}catch (InterruptedException ignored) {}};}}
配置文件
server:port: 28082
spring:application:name: rocketmq-orderly-consume-examplecloud:stream:function:definition: consumer;rocketmq:binder:name-server: localhost:9876bindings:producer-out-0:producer:group: output_1messageQueueSelector: orderlyMessageQueueSelectorconsumer-in-0:consumer:# tag: {@code tag1||tag2||tag3 }; sql: {@code 'color'='blue' AND 'price'>100 } .subscription: 'TagA || TagC || TagD'push:orderly: truebindings:producer-out-0:destination: orderlyconsumer-in-0:destination: orderlygroup: orderly-consumerlogging:level:org.springframework.context.support: debug
配置文件解释
主要配置
绑定服务器
rocketmq:binder:name-server: localhost:9876 //RocketMQ 的 NameServer 地址为 localhost:9876,用于获取 Broker 路由信息。意思是这里只有一个broker,并不是集群配置
生产者消费者配置
bindings:producer-out-0:producer:group: output_1 //生产者组:生产者组名为 output_1,用于事务消息或消息查询。messageQueueSelector: orderlyMessageQueueSelector //队列选择器:使用自定义的 orderlyMessageQueueSelector 选择消息队列,确保相同业务标识的消息发往同一队列,实现顺序性。
consumer-in-0:consumer:subscription: 'TagA || TagC || TagD'//订阅过滤:使用 Tag 过滤,订阅包含 TagA、TagC 或 TagD 的消息(逻辑或)。push:orderly: true//顺序消费:push.orderly: true 启用顺序消费模式,按队列顺序单线程处理消息。
生产者消费者绑定组和目的地
bindings:producer-out-0:destination: orderly //生产者目的地:生产者发送至 Topic 为 orderly。consumer-in-0:destination: orderlygroup: orderly-consumer //消费者组:消费者组名为 orderly-consumer,相同组内消费者分摊消费队列,不同组独立消费。
producer
这里逻辑很简单,就循环发送了100条数据,顺序发送给不同的tags,组装成了Message对象,然后通过streamBridge发送到producer-out-0的通道
@Bean
public ApplicationRunner producer() {return args -> {for (int i = 0; i < 100; i++) {String key = "KEY" + i;// 设置 RocketMQ 消息头Map<String, Object> headers = new HashMap<>();headers.put(MessageConst.PROPERTY_KEYS, key); // 消息的唯一标识(RocketMQ 的 KEY)headers.put(MessageConst.PROPERTY_TAGS, tags[i % tags.length]); // 消息的 Tag(按 tags 数组循环分配)headers.put(MessageConst.PROPERTY_ORIGIN_MESSAGE_ID, i); // 自定义原始消息ID(可选)// 创建消息对象:包含 payload 和 headersMessage<SimpleMsg> msg = new GenericMessage<>(new SimpleMsg("Hello RocketMQ " + i), headers);// 发送消息到名为 "producer-out-0" 的输出通道streamBridge.send("producer-out-0", msg);}};
}
selector(供生产者使用)
OrderlyMessageQueueSelector 的作用是 供生产者使用的,用于在发送顺序消息时选择特定的消息队列(MessageQueue),确保同一业务逻辑的消息被发送到同一个队列中,从而保证消费者能够按顺序消费。
@Component
public class OrderlyMessageQueueSelector implements MessageQueueSelector {private static final Logger log = LoggerFactory.getLogger(OrderlyMessageQueueSelector.class);/*** to select a fixed queue by id.* @param mqs all message queues of this topic.//当前主题(Topic)下的所有队列* @param msg mq message.//这是即将被消费的消息对象。它包含了消息的内容、属性和一些元数据。* @param arg mq arguments.//这个参数是消费者传入的自定义参数,通常用来携带一些额外的信息。* @return message queue selected.*/@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {Integer id = (Integer) ((MessageHeaders) arg).get(MessageConst.PROPERTY_ORIGIN_MESSAGE_ID);int index = id % RocketMQOrderlyConsumeApplication.tags.length % mqs.size(); //id%5%队列长度return mqs.get(index);}
}
consumer
1.每个队列由独立线程顺序消费。
2.同一队列中的消息按发送顺序处理,不同队列的消息可能并行处理。
@Beanpublic Consumer<Message<SimpleMsg>> consumer() {return msg -> {String tagHeaderKey = RocketMQMessageConverterSupport.toRocketHeaderKey(MessageConst.PROPERTY_TAGS).toString();log.info(Thread.currentThread().getName() + " Receive New Messages: " + msg.getPayload().getMsg() + " TAG:" +msg.getHeaders().get(tagHeaderKey).toString());try {Thread.sleep(100);}catch (InterruptedException ignored) {}};}
因为前面设计了selector,所以这里的消费结构应该是
假如这里的队列是默认的4
Thread-0 Receive: Hello RocketMQ 0 TAG:TagA
Thread-0 Receive: Hello RocketMQ 4 TAG:TagE
Thread-0 Receive: Hello RocketMQ 5 TAG:TagA
Thread-0 Receive: Hello RocketMQ 9 TAG:TagE
...(后续i=10,14,15...)
Thread-1 Receive: Hello RocketMQ 1 TAG:TagB
Thread-1 Receive: Hello RocketMQ 6 TAG:TagB
Thread-1 Receive: Hello RocketMQ 11 TAG:TagB
...(后续i=16,21...)
Thread-2 Receive: Hello RocketMQ 2 TAG:TagC
Thread-2 Receive: Hello RocketMQ 7 TAG:TagC
Thread-2 Receive: Hello RocketMQ 12 TAG:TagC
...(后续i=17,22...)
Thread-3 Receive: Hello RocketMQ 3 TAG:TagD
Thread-3 Receive: Hello RocketMQ 8 TAG:TagD
Thread-3 Receive: Hello RocketMQ 13 TAG:TagD
...(后续i=18,23...)
同一个队列中消息是顺序的,这里的thread0中有A,E两个标签,如果要避免这种情况,应该把队列设置为Tags.size的长度
他这里的设计应该就是为了尽可能的将不同标签分布在不同的队列,最终形成同一队列对应同一标签,然后实现顺序消费
实际开发案例(支付订单)说明
有了上面的案例下面我理解起来就方便很多了
注意:下面代码并不完整,只是一个大致逻辑说明
这里以支付订单案例说明
下面是代码前置,就是一个创建订单的流程,有兴趣的可以了解下,不然可以直接跳过看生产者消费者配置
一般咱们支付之前都会先生成订单,参数除了正常的支付单号,支付时间这些基本的东西外有一个支付倒计时这个功能,这个支付倒计时一般是咱们后台给配置的:这里我举个例,比如说后台模板中配置了1.消费下单:15分钟、2.通联支付:30分钟等等,这里我们会根据支付单号查询数据库对应的支付倒计时,这里超时咱们就可以用rockeMQ中延时队列来进行处理
下面代码可以不看,就是一个创建支付单的流程
- 检测订单是否存在
- 获取收益台模板(就是上面说的获取倒计时等数据这样一个东西)
- 先存数据库(防止前端多次下单)后支付的时候再调用第三方接口(也可以是直接对接银行)
- 存完设置redis缓存信息,防止多次创建订单
- 将订单数据假如延迟队列
@Overridepublic BillsPlan save(PaymentBillsDTO paymentBillsDTO) {String key = redisUtil.get("order:" + paymentBillsDTO.getBusinessOrderNo());if (key != null) {log.info("支付订单已创建,请前往收银台支付!");throw ExFactory.bizException(PaymentError.PAYMENT_BILL_COLLECTING);}// 检查订单是否存在PaymentBills byId = this.getOne(Wrappers.<PaymentBills>lambdaQuery().eq(PaymentBills::getBusinessOrderNo, paymentBillsDTO.getBusinessOrderNo()).eq(PaymentBills::getPaymentBillStatus, AgentCollectStatusEnum.COLLECT_SUCCESS.getStatus()).last("limit 1"));if (byId != null) {throw ExFactory.bizException(PaymentError.PAYMENT_BILL_FINISHED);}// 获取收银台PaymentTransactionType xiaofeixiadan = paymentTransactionTypeService.getOne(Wrappers.<PaymentTransactionType>lambdaQuery().eq(PaymentTransactionType::getTypeCode, "xiaofeixiadan"));if (Objects.isNull(xiaofeixiadan)) {throw ExFactory.bizException(PaymentError.PAYMENT_TRANSACTION_TYPE_NOT_EXIST);}Integer typeId = xiaofeixiadan.getId();CashierTemplate cashierTemplate = cashierTemplateService.getOne(Wrappers.<CashierTemplate>lambdaQuery().eq(CashierTemplate::getTransactionTypeId, typeId));if (Objects.isNull(cashierTemplate)) {throw ExFactory.bizException(PaymentError.CASHIER_TEMPLATE_NOT_EXIST);}Integer delayLevel;try {delayLevel = RocketMqDelayLevelEnum.getLevelByMinutes(cashierTemplate.getPaymentCountdown());} catch (Exception e) {throw ExFactory.bizException(PaymentError.CASHIER_TEMPLATE_BAD_TIMEOUT_PARAM);}// 保存数据到payment_bills表PaymentBills paymentBills = PaymentBillsConverter.INSTANCE.from(paymentBillsDTO);paymentBills.setPaymentBillType(TransactionTypeEnum.PAY.getCode());paymentBills.setPaymentBillStatus("1");this.save(paymentBills);// 保存数据到payment_bills_plan表,TODO 根据活动判断是否需要生成多条支付计划,目前只生成一条BillsPlan billsPlan = new BillsPlan();BeanUtil.copyProperties(paymentBillsDTO, billsPlan);billsPlan.setPaymentBillId(paymentBills.getPaymentBillId());billsPlan.setPricingSource("银行卡支付");billsPlan.setPaymentBillStatus("1");billsPlan.setPaymentBillType(TransactionTypeEnum.PAY.getCode());billsPlanService.save(billsPlan);// 记录第三方支付单PaymentThirdBills paymentThirdBills = new PaymentThirdBills();BeanUtil.copyProperties(billsPlan, paymentThirdBills);paymentThirdBills.setChannel("1");paymentThirdBillsService.save(paymentThirdBills);// redis设置订单失效时间redisUtil.set("order:" + paymentBills.getBusinessOrderNo(), String.valueOf(paymentBills.getPaymentBillId()), 30, TimeUnit.MINUTES);redisUtil.expire("order:" + paymentBills.getBusinessOrderNo(), 30, TimeUnit.MINUTES);// 发送延迟消息用于处理超时订单MessageDTO messageDTO = new MessageDTO();messageDTO.setDataJson(String.valueOf(paymentBills.getPaymentBillId()));messageDTO.setTag("payment");messageDTO.setType(AsyncExecuteTypeEnums.DELAY_CHECK_PAYMENT_RESULT.getType());producer.sendDelayMessage(messageDTO, delayLevel);inspectPayScheduleRpcServiceI.updateInspectPayScheduleStatus(paymentBills.getBusinessOrderNo(), 1);return billsPlan;}
producer
这里跟之前的案例没什么区别,都是streamBridge来发送消息,只不过这里是发送延时(延迟)消费,rocketMQ会根据设置的等级来设置延时时间
@RefreshScope
@Service
@Slf4j
public class RocketMqProducer {@Resourceprivate StreamBridge streamBridge;@Value("${spring.cloud.stream.paymentProducer}") // 在nacos中读取配置private String messageProducer;public <T> void sendMqMessage(MessageDTO dto) {streamBridge.send(messageProducer,MessageBuilder.withPayload(dto).setHeader(MessageConst.PROPERTY_TAGS, dto.getTag()).setHeader(MessageConst.PROPERTY_KEYS, dto.getType()).build());}public void sendDelayMessage(MessageDTO dto, Integer delayLevel) {// 创建消息头,设置延迟级别Map<String, Object> headers = new HashMap<>();headers.put(MessageConst.PROPERTY_DELAY_TIME_LEVEL, String.valueOf(delayLevel));headers.put(MessageConst.PROPERTY_TAGS,dto.getTag());headers.put(MessageConst.PROPERTY_KEYS,dto.getType());// 创建消息Message<MessageDTO> message = MessageBuilder.withPayload(dto).copyHeaders(headers).build();// 使用StreamBridge发送消息boolean sent = streamBridge.send(messageProducer, message);if (sent) {System.out.println("延迟消息发送成功");log.info("当前秒数:{}", LocalDateTime.now().getSecond());} else {System.out.println("延迟消息发送失败");}}}
nacos中的静态配置
# 配置 rocketmq 的 nameserver 地址
spring.cloud.stream.rocketmq.binder.name-server=******
spring.cloud.stream.rocketmq.producer.send-type=ASYNC
# 定义 通道 为 paymentProducer 的 生产者,paymentTransactionProducer为有事务的生产者
spring.cloud.stream.paymentProducer=paymentProducer-out-0
spring.cloud.stream.bindings.paymentProducer-out-0.binder=rocketmq
spring.cloud.stream.bindings.paymentProducer-out-0.content-type=application/json
spring.cloud.stream.bindings.paymentProducer-out-0.destination=payment-topic
spring.cloud.stream.paymentTransactionProducer=paymentTransactionProducer-out-0
spring.cloud.stream.bindings.paymentTransactionProducer-out-0.binder=rocketmq
spring.cloud.stream.bindings.paymentTransactionProducer-out-0.content-type=application/json
spring.cloud.stream.bindings.paymentTransactionProducer-out-0.destination=payment-topic
spring.cloud.stream.rocketmq.bindings.paymentTransactionProducer-out-0.producer.producerType=Trans
spring.cloud.stream.rocketmq.bindings.paymentTransactionProducer-out-0.producer.transactionListener=RocketMqTransactionListener# 定义 通道 为 paymentConsumer 的 消费者,tags 定义只接受 payment和all 消息
spring.cloud.stream.bindings.paymentConsumer-in-0.binder=rocketmq
spring.cloud.stream.bindings.paymentConsumer-in-0.content-type=application/json
spring.cloud.stream.bindings.paymentConsumer-in-0.destination=payment-topic
spring.cloud.stream.bindings.paymentConsumer-in-0.group=payment-customer-group
spring.cloud.stream.rocketmq.bindings.paymentConsumer-in-0.consumer.group=payment-customer-group
spring.cloud.stream.rocketmq.bindings.paymentConsumer-in-0.consumer.subscription=payment||all
spring.cloud.stream.rocketmq.bindings.paymentConsumer-in-0.consumer.messageModel=CLUSTERING
这里和案例中的都差不多
consumer
首先定义paymentConsumer的bean对象来接收名为paymentConsumer的topic,编程式事务根据不同的类型来执行不同的方法
着重看注释的地方
1.service.execute(dto);
2.getData(dto);//获取数据参数类型
3. asyncExcute(data);//根据参数类型进行重载
@Slf4j
@Configuration
public class AsyncExecuteConsumer {@Value("${payment.asyncMsg.maxRetryCount}")private Integer maxRetryCount;@Resourceprivate AsyncRetryInfoMapper asyncRetryInfoMapper;@Resourceprivate TransactionTemplate transactionTemplate;@Beanpublic Consumer<MessageDTO> paymentConsumer() {return message -> {log.info("paymentConsumer接到消息:{}", message);handleMessage(message);};}public void handleMessage(MessageDTO dto) {log.info("异步执行流程 接收MQ Content:{}", JSON.toJSONString(dto));if (StringUtils.isEmpty(dto.getType())){log.error("MQ消息type类型为空");return;}AsyncExecuteTypeEnums byType = AsyncExecuteTypeEnums.getByType(dto.getType());if(Objects.isNull(byType)){log.error("MQ消息type类型错误:{}", dto.getType());return;}AsyncExecuteService service = AsyncExecuteService.getService(byType); //这里通过类型获取对应的执行service对象if (Objects.nonNull(service)) {try {// 使用编程式事务确保事务正确传播transactionTemplate.execute(status -> {try {service.execute(dto);return true;} catch (Exception e) {status.setRollbackOnly();throw e;}});} catch (Exception e) {log.error("{}异步任务执行异常:{}",byType.getDesc(),e);// 记录异常信息if(checkRetryCount(dto.getCurrRetryCount())){dto.setErrorMsg(e.getCause().getMessage());dto.setCurrRetryCount(dto.getCurrRetryCount()+1);// 重试log.info("{}异步任务第{}次重试",byType.getDesc(), dto.getCurrRetryCount());handleMessage(dto);`在这里插入代码片`}else{//记录异常到数据库log.info("{}异步任务达到最大重试次数,入库",byType.getDesc());asyncRetryInfoMapper.insert(MsgRetryConverter.INSTANCE.toRetry(dto));}}}}/*** 检查是否可重试* @param currentRetryCount* @return*/public Boolean checkRetryCount(Integer currentRetryCount) {currentRetryCount++;return currentRetryCount <= maxRetryCount;}}
这里通过枚举定义了3个类型 入账,支付,提现
@Getter
public enum AsyncExecuteTypeEnums {/*** 入账*/ACCOUNTING("accouting", "入账"),DELAY_CHECK_PAYMENT_RESULT("delayCheckPaymentResult", "延迟检测支付结果"),DELAY_CHECK_WITHDRAW_RESULT("delayCheckWithdrawResult", "延迟检测提现结果"),;private final String type;private final String desc;AsyncExecuteTypeEnums(String type, String desc) {this.type = type;this.desc = desc;}public static AsyncExecuteTypeEnums getByType(String type) {for (AsyncExecuteTypeEnums asyncExecuteTypeEnums : AsyncExecuteTypeEnums.values()) {if (asyncExecuteTypeEnums.getType().equals(type)) {return asyncExecuteTypeEnums;}}return null;}
}
@Slf4j
public abstract class AsyncExecuteService<T> {/*** Service仓库*/protected static Map<AsyncExecuteTypeEnums, AsyncExecuteService> SERVICES = new HashMap<>();/*** 获取Service** @param type 类型* @return Service*/public static AsyncExecuteService getService(AsyncExecuteTypeEnums type) {return SERVICES.get(type);}/*** 执行流程** @param dto 参数*/@Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRES_NEW)public void execute(MessageDTO dto) {try {T data = getData(dto); //获取对应的类型,以便根据业务执行不同的代码if(bizVerify(data)){asyncExcute(data);}} catch (Exception e) {log.error("执行异步任务时发生异常:{}", e);throw e;}}/*** 获取数据** @param dto 参数* @return 结果*/protected T getData(MessageDTO dto) {try {ParameterizedType parameterizedType = (ParameterizedType) this.getClass().getGenericSuperclass(); //this.getClass().getGenericSuperclass() 获取当前类的泛型父类类型,即 AsyncExecuteService<T>。@SuppressWarnings("unchecked")Class<T> clazz = (Class<T>) parameterizedType.getActualTypeArguments()[0];//parameterizedType.getActualTypeArguments()[0] 获取泛型参数 T 的实际类型。if(clazz.isInstance(String.class)) {return (T) dto.getDataJson();}return JSON.parseObject(dto.getDataJson(), clazz);//使用 JSON.parseObject(dto.getDataJson(), clazz) 将 dataJson 字符串转换为指定的类型 T。}catch (Exception e) {log.error("异步执行流程 转换数据异常 数据:{}", dto, e);throw new RuntimeException("数据转换异常", e);}}/*** 初始化Factory*/@PostConstructprotected abstract void registerService();/*** 验证业务上的事务是否提交** @param dto 参数*/protected abstract Boolean bizVerify(T dto);/*** 执行核心业务** @param dto 参数*/protected abstract void asyncExcute(T dto);}
这里继承了AsyncExecuteService这个抽象类用于实现具体执行体
@Slf4j
@Service
public class PaymentBillsDelayConsumer extends AsyncExecuteService<String> {@Lazy@Resourceprivate PaymentBillsService paymentBillsService;@Lazy@Resourceprivate BillsPlanService billsPlanService;@Lazy@Resourceprivate PaymentThirdBillsService paymentThirdBillsService;@Lazy@Resourceprivate PaymentRequestService paymentRequestService;@Lazy@Resourceprivate RedisUtil redisUtil;@Lazy@Resourceprivate AllinPayService allinPayService;@Lazy@DubboReferenceprivate InspectPayScheduleRpcServiceI inspectPayScheduleRpcService;@Overrideprotected void registerService() {SERVICES.put(AsyncExecuteTypeEnums.DELAY_CHECK_PAYMENT_RESULT, this);}@Overrideprotected Boolean bizVerify(String paymentBillId) {PaymentBills paymentBills = paymentBillsService.getById(Long.valueOf(paymentBillId));if (Objects.isNull(paymentBills)) {log.error("支付订单id:{} 不存在", paymentBillId);return false;}return true;}@Overrideprotected void asyncExcute(String paymentBillId) {log.info("支付订单id:{} 开始执行订单超时处理", paymentBillId);log.info("当前秒数:{}", LocalDateTime.now().getSecond());PaymentBills paymentBills = paymentBillsService.getById(Long.valueOf(paymentBillId));// 删除redis缓存的keyLong businessOrderNo = paymentBills.getBusinessOrderNo();if(redisUtil.hasKey("order:"+businessOrderNo)) {redisUtil.delete("order:"+businessOrderNo);}if (!paymentBills.getPaymentBillStatus().equals(AgentCollectStatusEnum.COLLECTING.getStatus())) {log.info("支付订单id:{} 已支付或已进行超时处理", paymentBillId);return;}// 将支付中的订单/计划单/支付单/支付请求的状态修改为超时// 订单paymentBills.setPaymentBillStatus(AgentCollectStatusEnum.COLLECT_TIMEOUT.getStatus());paymentBillsService.updateById(paymentBills);// 计划单List<BillsPlan> billsPlans = billsPlanService.list(Wrappers.<BillsPlan>lambdaQuery().eq(BillsPlan::getPaymentBillId, paymentBillId).eq(BillsPlan::getPaymentBillStatus, AgentCollectStatusEnum.COLLECTING.getStatus()));if(CollectionUtils.isNotEmpty(billsPlans)) {billsPlans.forEach(billsPlan -> {billsPlan.setPaymentBillStatus(AgentCollectStatusEnum.COLLECT_TIMEOUT.getStatus());});billsPlanService.updateBatchById(billsPlans);paymentRequestService.update(Wrappers.<PaymentRequest>lambdaUpdate().in(PaymentRequest::getPaymentPlanId, billsPlans.stream().map(BillsPlan::getPaymentPlanId).toList()).eq(PaymentRequest::getPaymentStatus, AgentCollectStatusEnum.COLLECTING.getStatus()).set(PaymentRequest::getPaymentStatus, AgentCollectStatusEnum.COLLECT_TIMEOUT.getStatus()));}// 支付单paymentThirdBillsService.update(Wrappers.<PaymentThirdBills>lambdaUpdate().eq(PaymentThirdBills::getPaymentBillId, paymentBillId).eq(PaymentThirdBills::getPaymentBillStatus, AgentCollectStatusEnum.COLLECTING.getStatus()).set(PaymentThirdBills::getPaymentBillStatus, AgentCollectStatusEnum.COLLECT_TIMEOUT.getStatus()));// 将B3的支付状态修改为支付超时// 支付请求单
// // 调用第三方接口将支付中的请求单关闭
// paymentRequestService.list(Wrappers.<PaymentRequest>lambdaQuery()
// .in(PaymentRequest::getPaymentPlanId, billsPlans.stream().map(BillsPlan::getPaymentPlanId).toList())
// .eq(PaymentRequest::getPaymentStatus, AgentCollectStatusEnum.COLLECTING.getStatus()))
// .forEach(paymentRequest -> {
// try {
// JSONObject object = allinPayService.closeOrder(paymentRequest.getPaymentRequestId() + "");
// log.info("支付请求id:{} 关闭结果:{}", paymentRequest.getPaymentRequestId(), object);
// }catch (Exception e){
// log.error("支付请求id:{} 关闭失败", paymentRequest.getPaymentRequestId());
// }
// });// 修改B3付款状态为待支付inspectPayScheduleRpcService.updateInspectPayScheduleStatus(paymentBills.getBusinessOrderNo(), 0);}
}
总结
以上就是spring cloud stream 集成rocketmq的全部,像使用事务消息,获取其他可以继续看看文档,写得还是比较好理解,同理的如果想集成kafka,rabbitMQ,也可以下载案例进行参考
相关文章:
Spring Cloud Stream集成RocketMQ(kafka/rabbitMQ通用)
什么是Spring Cloud Stream Spring Cloud Stream 是 Spring 生态系统中的一个框架,用于简化构建消息驱动微服务的开发和集成。它通过抽象化的方式将消息中间件(如 RabbitMQ、Kafka、RocketMQ 等)的复杂通信逻辑封装成简单的编程模型…...
基于docker使用showdoc搭建API开发文档服务器
以下是基于 Docker 快速搭建 ShowDoc API 文档服务器的完整指南,包含优化配置和常见问题解决方案: 1. 快速部署方案 # 创建数据目录(确保权限) mkdir -p /showdoc_data/html && chmod 777 -R /showdoc_data# 一键启动容器…...
Vision-Language Models (VLMs) 视觉语言模型的技术背景、应用场景和商业前景(Grok3 DeepSearch模式回答)
prompt: 你是一位文笔精湛、十分专业的技术博客作者,你将从技术背景、应用场景和商业前景等多个维度去向读者介绍Vision-Language Models 关键要点 研究表明,视觉语言模型(VLMs)是多模态AI系统,能同时处理视觉和文本数…...
OpenAI大变革!继续与微软等,以非营利模式冲击AGI
今天凌晨2点,OpenAI宣布,将继续由非营利组织控制;现有的营利性实体将转变为一家公共利益公司;非营利组织将控制该公共利益公司,并成为其重要的持股方。 这也就是说OpenAI曾在去年提到的由非营利性转变成营利性公司&am…...
Ubuntu打开中文文本乱码
文章目录 中文乱码问题修复乱码系统字符编码修改文本编码修改vim乱码 utf-8编码原理特点应用场景与其他编码的转换 iso-8859-1基本信息字符涵盖应用场景与其他编码的关系 ubuntu打开文本出现乱码,可能是编码没设置对。 中文乱码问题 使用vim打开文本,或…...
车载通信网络安全:挑战与解决方案
1. 简介 当今时代见证了车载汽车技术的巨大发展,因为现代智能汽车可以被视为具有出色外部基础设施连接能力的信息物理系统 [ 1 ]。车载技术支持的现代智能汽车不应被视为类似于机械系统,而是由数百万行复杂代码组成的集成架构,可为车内乘客提…...
【Linux系统】读写锁
读者写者问题 重点 读者写者问题是并发编程中的经典问题,主要研究多个进程或线程对共享数据进行读和写操作时如何实现同步和互斥,以保证数据的一致性和操作的正确性 。 问题核心要点 同步与互斥:需要确保多个读者可以同时读共享数据&#…...
springBoot中自定义一个validation注解,实现指定枚举值校验
缘由 在后台写接口的时候,经常会出现dto某个属性是映射到一个枚举的情况。有时候还会出现只能映射到枚举类中部分枚举值的情况。以前都是在service里面自行判断,很多地方代码冗余,所以就想着弄一个自定义的validation注解来实现。 例如下面某…...
【Python】--装饰器
装饰器(Decorator)本质上是一个返回函数的函数 主要作用是:在不修改原函数代码的前提下,给函数增加额外的功能 比如:增加业务,日志记录、权限验证、执行时间统计、缓存等场景 my_decorator def func():pas…...
排序算法——堆排序
一、介绍 「堆排序heapsort」是一种基于堆数据结构实现的高效排序算法。我们可以利用已经学过的“建堆操作”和“元素出堆操作”实现堆排序。 1. 输入数组并建立小顶堆,此时最小元素位于堆顶。 2. 不断执行出堆操作,依次记录出堆元素,即可得…...
Day111 | 灵神 | 二叉树 | 验证二叉搜索树
Day111 | 灵神 | 二叉树 | 验证二叉搜索树 98.验证二叉搜索树 98. 验证二叉搜索树 - 力扣(LeetCode) 方法一:前序遍历 递归函数传入合法的左右边界,只有当前结点是合法的边界,才是二叉搜索树,否则就返回…...
软考-软件设计师中级备考 13、刷题 数据结构
倒计时17天时间不多了,数据库、UML、等知识点有基础直接略过,法律全靠考前的一两天刷题,英语直接放弃。 一、数据结构:链表、栈、队列、数组、哈希表、树、图 1、关于链表操作,说法正确的是: A)新增一个头…...
【5G通信】天线调整
在天线工程中,机械下倾角、电子下倾角和数字下倾角是调整天线波束指向的不同技术手段,其核心区别在于实现方式和灵活性: 1. 机械下倾角(Mechanical Downtilt) 定义:通过物理调整天线的安装角度,…...
Kafka的Log Compaction原理是什么?
Kafka的Log Compaction(日志压缩)是一种独特的数据保留策略,其核心原理是保留每个key的最新有效记录。以下是关键原理分点说明: 1. 键值保留机制 通过扫描所有消息的key,仅保留每个key对应的最新value值。例如&#…...
嵌入式面试八股文(十四)·内存管理机制、优先级继承机制以及优先级翻转
目录 1. 内存管理算法(五种内存管理机制) 1.1 heap_1.c 1.2 heap_2.c 1.3 heap_3.c 1.4 heap_4.c 1.5 heap_5.c 1.6 总结 2. STM32通知寄存器有哪些? 2.1 核心寄存器组(Cortex-M) 2.2 特殊功能寄存…...
深度剖析:可视化如何重塑驾驶舱信息交互模式
为什么你开车时总觉得“信息太多却抓不住重点”? 今天的汽车早已不是单纯的交通工具,而是一个高度集成的信息终端。从导航、油耗、胎压到自动驾驶提示,各种数据不断涌进驾驶舱。 但问题也随之而来: 关键信息被淹没在一堆图标里…...
app根据蓝牙名字不同,匹配不同的产品型号,显示对应的UI界面
在开发一个 App 时,如果希望根据蓝牙设备名称(Bluetooth Name)的不同,自动匹配不同的产品型号,并显示对应的 UI 界面,可以按照以下思路来实现: ✅ 功能目标 扫描并连接蓝牙设备;获取…...
数据结构 --- 栈
1.栈的初始化 2.入栈 3.出栈 4.取出栈顶元素 5.获取栈中有效元素个数 6.栈的销毁 栈:⼀种特殊的线性表,其只允许在固定的⼀端进⾏插⼊和删除元素操作。进⾏数据插⼊和删除操作 的⼀端称为栈顶,另⼀端称为栈底。栈中的数据元素遵守后进先…...
37-算法打卡-栈与队列-滑动窗口最大值-leetcode(239)-第三十七天
1 题目地址 239. 滑动窗口最大值 - 力扣(LeetCode)239. 滑动窗口最大值 - 给你一个整数数组 nums,有一个大小为 k 的滑动窗口从数组的最左侧移动到数组的最右侧。你只可以看到在滑动窗口内的 k 个数字。滑动窗口每次只向右移动一位。返回 滑…...
【原创分享】魔音变声器内含超多语音包实时变声
魔音变声器,一款专业的调音变声器软件 亲测可使用所有功能[真棒] 去除所有广告 ————————————【下 载 地 址】———————————— 【获取方法1】:https://pan.xunlei.com/s/VOP_TXtKNlevTgYvIlxmmJquA1?pwd8vpi# ————————————【下 …...
数据结构(一)——线性表的顺序表示和实现
一、线性表的定义 由n(n>0)个数据特性相同的元素构成的有限序列称为线性表,(n0)的时候被称为空表。 一个数据元素可以是简单的一个数据,一个符号,也可以是复杂的若干个数据项的组合。 二、线性表的类型定义 s线性表是由n(n≥0)个相同类…...
Winform(12.控件讲解)
ChildForm窗口: ChildForm代码: using System; using System.Collections.Generic; using System.ComponentModel; using System.Data; using System.Drawing; using System.Linq; using System.Text; using System.Threading.Tasks; using System.Windows.Forms; namespac…...
Python 10天冲刺 《元编程(Meta-programming)》
Python 的元编程(Meta-programming)是指在程序运行期间动态生成、修改或操作代码的技术。它允许开发者通过代码控制代码的行为,从而实现灵活、可扩展和抽象化的编程模式。Python 提供了多种元编程工具,包括装饰器、元类、动态导入…...
Android开发-创建、运行、调试App工程
在移动应用开发的世界里,Android平台凭借其开放性和广泛的设备支持,成为了许多开发者的选择。而要成为一名合格的Android开发者,掌握如何创建、运行以及调试应用程序是必不可少的基础技能。本文将详细介绍如何使用Android Studio完成这些任务…...
系统级编程(二):通过读取PE文件获取EXE或者DLL的依赖
PE文件 Windows的PE文件(Portable Executable)是一种专为Windows操作系统设计的标准可执行文件格式,用于存储和管理可执行程序、动态链接库(DLL)、驱动程序等二进制文件。PE文件格式自Windows NT 3.1引入以来,已成为Windows平台上所有可执行文件的标准格式,并广泛应用于…...
Linux主机时间设置操作指南及时间异常影响
一、Linux主机时间设置命令操作指南 1. 查看当前系统时间与时区 查看当前时间与时区:timedatectl # 显示详细时间与时区信息(systemd系统适用) date # 查看当前系统时间 hwclock --show # 查看硬件时…...
GPS定位方案
目录 一、常用的GPS定位方案包括: 二、主流品牌及热销型号 三、常用GPS算法及核心逻辑: 一、基础定位算法 二、高精度算法 三、辅助优化算法 四、信号处理底层算法 四、基本原理(想自己写算法的琢磨一下原理) 一、常用的GP…...
应对联网汽车带来的网络安全挑战
数字化加速正在彻底改变全球各行各业,而汽车行业更是走在了前列。目前,全球自动驾驶汽车保有量约为4860万辆,预计到2024年将增长至5420万辆。 智能汽车的崛起无疑令人兴奋,但也带来了一系列问题。为了保护客户免受新的威胁,汽车行业必须做出一系列考量:针对自动驾驶、网…...
人工智能与生命科学的深度融合:破解生物医学难题,引领未来科技革命
引言 随着人工智能技术的飞速发展,生命科学领域迎来了前所未有的变革。从药物研发到疾病预测,从个性化医疗到基因组学,AI的深度融入不仅加速了生物医学的进步,还在多个领域打破了传统科学研究的局限,开创了新的医学前沿…...
DeepSeek智能时空数据分析(七):4326和3857两种坐标系有什么区别?各自用途是什么?
序言:时空数据分析很有用,但是GIS/时空数据库技术门槛太高 时空数据分析在优化业务运营中至关重要,然而,三大挑战仍制约其发展:技术门槛高,需融合GIS理论、SQL开发与时空数据库等多领域知识;空…...
Qt/C++面试【速通笔记七】—Qt中为什么new QWidget不需要手动调用delete?
在Qt的开发中,管理内存是一个非常重要的话题,特别是在使用QWidget这类窗口组件时,很多开发者会遇到一个问题:“为什么我使用new QWidget创建的窗口对象不需要手动调用delete进行销毁?”。 1. 父子关系机制:…...
Super-vlan
Super VLAN(VLAN聚合)的理论与配置 1. 基本概念 Super VLAN(超级VLAN)是一种VLAN聚合技术,主要用于解决传统VLAN划分中IP地址浪费的问题。其核心思想是将多个Sub VLAN(子VLAN)聚合到一个Super …...
C——函数
一、函数的概念 数学中我们其实就⻅过函数的概念,⽐如:⼀次函数 y kx b ,k和b都是常数,给⼀个任意的 x,就得到⼀个y值。 其实在C语⾔也引⼊函数(function)的概念,有些翻译为&…...
5.6刷题并查集
P1551 亲戚 #include<bits/stdc.h> using namespace std; const int N 5010; int f[N]; int find(int x){if(f[x] x)return x;return f[x] find(f[x]); } void solve(){int n, m, p; cin >> n >> m >> p;for(int i 1; i < n; i)f[i] i;for(in…...
pcl平面投影
// 创建一个系数为XY0,Z1的平面pcl::ModelCoefficients::Ptr coefficients (new pcl::ModelCoefficients ());coefficients->values.resize (4);coefficients->values[0] coefficients->values[1] 0;coefficients->values[2] 1.0;coefficients->values[3] 0…...
Linux远程管理
如何查看ip 如何使用vim编辑器 如何设置网络信息 远程访问 一:网络管理 (1)获取计算机的网络信息 基本语法: windows ipconfig ifconfig enS33: f1agS4163<UP,BR0ADCAST,RUNNING,MULTICAST> mtu 1500 inet…...
如何添加或删除极狐GitLab 项目成员?
极狐GitLab 是 GitLab 在中国的发行版,关于中文参考文档和资料有: 极狐GitLab 中文文档极狐GitLab 中文论坛极狐GitLab 官网 项目成员 (BASIC ALL) 成员是有权访问您的项目的用户和群组。 每个成员都有一个角色,这决定了他们在项目中可以…...
2025年服务器技术全景解析:量子计算、液冷革命与未来生态构建
2025年服务器技术全景解析:量子计算、液冷革命与未来生态构建 一、量子计算:从实验室到产业化的跨越 1. 中国量子计算产业化突破 • 本源量子“悟空”超导计算机: 搭载72位自主超导量子芯片“悟空芯”,支持198个量子比特…...
Vue3+ Vite + Element-Plus + TypeScript 从0到1搭建
一环境准备 二vite 项目初始化 按照 🍃Vite 官方文档 - 搭建第一个 Vite 项目 说明,执行以下命令完成 vue 、typescirpt 模板项目的初始化 npm init vitelatest vue3-element-admin --template vue-tsvue3-element-admin: 自定义的项目名称 vue-ts &am…...
如何对 Redis 进行水平扩展和垂直扩展以应对微服务流量的增长?
核心概念: 垂直扩展 (Scale Up): 提升单个节点的性能。简单来说就是给现有的 Redis 服务器增加更多的 CPU 、内存、更快的存储(SSD)或更高的网络带宽。水平扩展 (Scale Out): 增加更多节点来分担负载。这意味着部署多个 Redis 实例ÿ…...
PyCharm 加载不了 conda 虚拟环境,不存在的
#工作记录 前言 在开发过程中,PyCharm 无法加载 Conda 虚拟环境是常见问题。 在不同情况下,“Conda 可执行文件路径”的指定可能会发生变化,不会一尘不变,需要灵活处置。 以下是一系列解决此问题的经验参考。 检查 Conda 安装…...
Matlab/Simulink的一些功能用法笔记(4)
水一篇帖子 01--MATLAB工作区的保护眼睛颜色设置 默认的工作区颜色为白色 在网上可以搜索一些保护眼睛的RGB颜色参数设置 在MATLAB中按如下设置: ①点击预设 ②点击颜色,点击背景色的三角标符号 ③点击更多颜色,找到RGB选项 ④填写颜色参数…...
OS7.【Linux】基本指令入门(6)
目录 1.zip和unzip 配置指令 使用 两个名词:打包和压缩 打包 压缩 Linux下的操作演示 压缩和解压缩文件 压缩和解压缩目录 -d选项 2.tar Linux下的打包和压缩方案简介 czf选项 xzf选项 -C选项 tzf选项 3.bc 4.uname 不带选项的uname -a选项 -r选项 -v选项…...
便捷OCR文字识别软件推荐
软件介绍 此次要介绍的是一款OCR识别软件。 核心功能及特点 这款小巧的OCR识别软件,功能简洁,操作方便,只需进行截图,随后就能自动识别文字内容。并且,它具备离线使用的特性,这一特点使得它非常适合在不联…...
【中间件】brpc_基础_栈管理
文章目录 BRPC bthread栈管理1 简介2 关键数据结构2.1 栈描述符 (bthread_stack_t)2.2 栈池 (StackPool) 3 核心操作3.1 栈分配 (bthread_stack_alloc)3.2 栈释放 (bthread_stack_dealloc)3.3 栈切换支持 4 性能优化5 安全性设计6 跨平台实现6.1 Linux6.2 Windows 7 应用场景8 …...
Linux 硬盘和光驱系统管理
一、硬盘与目录的容量 [rootwww ~]# df [-ahikHTm] [目录或档名] 选项与参数: -a :列出所有的档案系统,包括系统特有的 /proc 等档案系统; -k :以 KBytes 的容量显示各档案系统; -m :以 MByt…...
分库分表后复杂查询的应对之道:基于DTS实时性ES宽表构建技术实践
1 问题域 业务发展的初期,我们的数据库架构往往是单库单表,外加读写分离来快速的支撑业务,随着用户量和订单量的增加,数据库的计算和存储往往会成为我们系统的瓶颈,业界的实践多数采用分而治之的思想:分库…...
[三分钟]性能测试工具JMeter入门: 下载安装JMeter并设置中文;JMeter基本使用流程
文章目录 1.下载并打开JMeter2.设置JMeter中文3.JMeter基本使用流程 Apache JMeter 是 Apache 组织基于 Java 开发的压力测试工具。 JMeter 支持多种协议和技术,如 HTTP、HTTPS、FTP、JDBC、SOAP、REST、JMS 等。它不仅可以用于性能测试,还可以用于功能测…...
StableDiffusionWebUI的AI绘图AI绘视频详细使用教程+报错排坑
概述 这里是官方的最原始的体积最小的StableDiffusionWebUI的下载及其使用教程,已经帮你们把坑都排完了,本教程适合开发者、程序员自己折腾,源码体积只有1.8M。 从0安装到绘图 1.环境 Python与Git环境: 安装Python3.10.0 >…...
Flutter 合并 ‘dot-shorthands‘ 语法糖,Dart 开始支持交叉编译
最近在 Dart 在 main 3.9 合并了一项名为 「dot-shorthands」 的语法糖提议,该提议主要是为了简化开发过程中的相关静态固定常量的写法,通过上下文类型推断简化枚举值和静态成员的访问: 简单来说,就是在之前你可能需要写 SomeEnum…...