当前位置: 首页 > news >正文

RocketMQ实现基于可靠消息的最终一致性

RocketMQ实现基于可靠消息的最终一致性

文章目录

  • RocketMQ实现基于可靠消息的最终一致性
    • 一、RocketMQ应用场景
      • **应用解耦**
      • **流量削峰**
      • **数据分发**
    • 二、RocketMQ 基础概念
      • 1. 核心组件
      • 2. 消费模式
      • 3. 消息可靠性
    • 三、消息类型
      • 按发送方式分
        • 同步发送
        • 异步发送
        • 单向发送
      • 按使用功能特点分
        • **普通消息(订阅)**
        • **顺序消息**
        • **延时消息——订单超时库存归还**
        • **事务消息**
    • **RocketMQ实现基于可靠消息最终一致性**
    • **业务逻辑**
      • **订单服务**
      • **库存服务**

一、RocketMQ应用场景

在当今分布式系统中,消息队列已经成为服务解耦、流量削峰、异步处理的重要组成部分。无论是订单系统、库存系统,还是支付、日志系统,消息队列都能发挥巨大的作用。

RocketMQ 是阿里开源的一个分布式消息中间件,具有高吞吐、低延迟、高可用、可扩展等优点,非常适合企业级应用。

应用解耦

系统的耦合性越高,容错性就越低。举例:用户创建订单后,如果耦合调用库存系统、物流系统、支付系统,任何一个子系统除了故障或者因为升级等原因暂时不可用,都会造成下单操作异常,影响用户使用体验。

将原本的“直接调用”改为“发送消息”,由消息队列来转发处理流程。

过程:

  1. 用户提交订单,请求到“订单系统”。
  2. 订单系统在本地完成事务后,发送消息到消息队列,例如 Kafka、RabbitMQ、RocketMQ 等。
  3. 其他系统(库存、支付、物流)各自订阅对应的消息,当消息到来时再异步处理:
  • 库存系统:接收到“订单创建”消息后扣减库存;
  • 支付系统:接收到消息后创建支付请求;
  • 物流系统:准备发货。

优点:

  1. 某个系统挂了,不影响主流程,下单流程仍可继续,消息会保存在队列中,系统恢复后可继续消费
  2. 新增业务只需订阅消息,不改动订单系统逻辑
  3. 通过 MQ 中转,系统之间不直接调用,降低耦合度

流量削峰

消息队列可将大量请求缓存起来,分散到很长一段时间处理,这样可以大大提高系统的稳定性和用户体验。当系统负载超过阈值,使用消息队列将请求缓存起来(而不是直接组织用户请求),等待系统处理完毕后通知用户下单完毕。

数据分发

通过消息队列可以让数据在多个系统间流通,数据的产生方不需要关心谁来使用数据,只需要将数据发送到消息队列,数据使用方直接在消息队列中获取数据即可。

二、RocketMQ 基础概念

1. 核心组件

  • Producer:消息生产者,负责发送消息。
  • Consumer:消息消费者,负责接收消息。
  • Broker:消息中转站,存储和转发消息(邮局)。
  • NameServer:注册中心,维护 Broker 路由信息(各个邮局的管理机构)。
  • Topic:区分消息的种类,一个发送者可以发送消息给一个或者多个Topic,一个消息的接收者可以订阅一个或者多个Topic消息
  • Message Queue:相当于Topic的分区,用于并行发送和接受消息

2. 消费模式

  • Push 模式:Broker 主动推送消息给 Consumer。
  • Pull 模式:Consumer 主动轮询 Broker 获取消息。

3. 消息可靠性

RocketMQ 支持消息的持久化存储、消息确认机制、消息重试,保证消息在分布式环境下的可靠传递。

三、消息类型

按发送方式分

同步发送
  1. 同步发送,线程阻塞,投递completes阻塞结束

  2. 如果发送失败,会在默认的超时时间3秒内重试,最多重试2次

  3. 投递completes不代表投递成功,要check SendResult.sendStatus来判断是否投递成功

  4. SendResult里面有发送状态的枚举:SendStatus,同步的消息投递有一个状态返回值

  5. retry的实现原理,只有ack的SendStatus=SEND_OK才会停止retry(发送同步消息且Ack为SEND_OK,只代表消息成功的写入了MQ当中,并不代表该消息成功被消费)

异步发送
  1. 当前线程一定要等待异步线程毁掉结束再关闭producer,因为是异步的,不会阻塞,提前关闭producer会导致未回调链接就断开
  2. 异步消息不retry,投递失败回调onException()方法,只有同步消息才会retry
  3. 异步发送一般用于链路耗时较长,对响应时间较为敏感的业务场景,比如用户视频上传后通知启动转码服务,转码完成后通知转码结果
单向发送
  1. 消息不可靠,性能高,只负责往服务器发送一条消息,不会重试也不关心是否发送成功
  2. 此方式发送消息的过程耗时非常短,一般在微妙级别image-20250420172809113

按使用功能特点分

普通消息(订阅)

生产者需要关注消息发送成功即可,消费者消费到消息即可,不需要保证消息的顺序,所以消息可以大规模并发地发送和消费,吞吐量很高,适合大部分场景

顺序消息

哪条消息先进入,那条消息就先被消费。分为全局顺序消息和分区顺序消息

全局顺序消息:所有消息都在同一个队列中(相当于只有一个分区的分区顺序消息)(代价大,并发性能低)

分区顺序消息:相同业务标识的消息始终发送到同一个队列,从而保证该业务内消息有序(并发性好)

延时消息——订单超时库存归还

延时的机制是在服务端实现的,也及时Broker收到了消息,但是经过一段时间以后才发送

事务消息

RocketMQ 分布式事务是一种 最终一致性事务模型,流程分为三步:

  1. 发送预消息(half message):消息不会被消费。
  2. 执行本地事务:如创建订单、扣库存。
  3. 返回事务状态给 MQ:
  • 成功 → MQ 提交消息,被消费者消费;
  • 失败 → MQ 回滚消息,不再消费;
  • 超时未回复(未知状态) → RocketMQ 调用 CheckLocalTransaction 查询事务状态。

RocketMQ实现基于可靠消息最终一致性

微服务间事务的问题:1.磁盘满了,服务宕机 2.代码出现异常,直接崩溃停止 3.网络拥塞或抖动

先扣减库存,再新建订单:

  1. 本地执行失败,则调用库存归还接口

  2. 非本地执行失败,而是代码异常程序崩溃,此时不知道本地执行情况

先创建订单,后扣减库存:

将库存服务加入到本地事务中,先在本地进行订单相关业务处理,最后扣减库存成功后才提交整体事务,否则回滚。

问题:

  1. 扣减库存请求发送出去,但是网络拥塞,则会多次发送请求重试,实际多次扣减库存。
  2. 本地订单服务向库存服务发送请求,发送后本地订单服务宕机,本地订单事务不会提交,但是库存成功扣减。或者网络拥塞造成实际库存服务扣减成功,但是本地服务以为扣减失败,本地事务进行回滚,但是库存还是扣减了。

image-20250420173359428

可靠消息要确保:消息一定能被消费者成功消费

  • 积分服务:积分没有上限,加积分一定可以成功
  • 库存服务:库存不能为负数,所以扣减库存允许失败

项目中的最终解决方案:

image-20250420173426035

RocketMQ 的事务消息用于处理“消息可靠发送 + 本地事务一致性”场景。发送事务消息后,可以:

  • 执行本地事务
  • 根据本地事务的执行结果来提交、回滚或挂起(未知)事务消息
  • 若返回未知,RocketMQ 会定时回查消息的事务状态

业务逻辑

订单服务

新建订单方法:

使用RocketMQ初始化事务监听器和事务消息生产者,将订单信息发送出去,指定topic为库存归还(预先发送库存归还消息,如果本地事务执行成功则回滚消息,如果本地事务执行失败则提交消息)

发送消息后,RocketMQ会自动调用事务监听器的方法,执行本地事务

func (*OrderServer) CreateOrder(ctx context.Context, req *proto.OrderRequest) (*proto.OrderInfoResponse, error) {// 初始化订单事务监听器orderListener := OrderListener{Ctx: ctx}// 创建事务消息生产者p, err := rocketmq.NewTransactionProducer(&orderListener,producer.WithNameServer([]string{"192.168.26.2:9876"}),)if err != nil {zap.S().Errorf("生成producer失败: %s", err.Error())return nil, err}// 启动事务消息if err = p.Start(); err != nil {zap.S().Errorf("启动producer失败: %s", err.Error())return nil, err}// 构造订单信息order := model.OrderInfo{User:         req.UserId,OrderSn:      GenerateOrderSn(req.UserId),Address:      req.Address,SignerName:   req.Name,SingerMobile: req.Mobile,Post:         req.Post,}jsonString, _ := json.Marshal(order)// 发送事务消息(订单信息)(预先发送库存归还消息,如果本地事务执行成功则回滚消息,如果本地事务执行失败则提交消息)_, err = p.SendMessageInTransaction(context.Background(),primitive.NewMessage("order_reback", jsonString),)if err != nil {fmt.Printf("发送失败: %s\n", err)return nil, status.Error(codes.Internal, "发送消息失败")}// 判断事务监听器中的状态码是否为成功,如果不成功(本地事务执行失败)就返回错误if orderListener.Code != codes.OK {return nil, status.Error(orderListener.Code, orderListener.Detail)}return &proto.OrderInfoResponse{Id: orderListener.ID, OrderSn: order.OrderSn, Total: orderListener.OrderAmount}, nil
}

事务监听器:

实现事务监听接口(此接口有两个方法:执行本地事务和检查本地事务)的结构体(方法调用者),在其中加入自定义字段,存储接口中的方法结束后,需要返回的信息(直接将信息存储到方法调用者中的相应字段里,以便在新建订单方法中可以直接查看)

由于接口中的方法,返回的是primitive.LocalTransactionState(事务状态:提交或回滚或未知),所以在执行完本地事务后,其它信息字段不能直接返回,需要存储到方法调用者结构体中

type OrderListener struct {Code        codes.Code // 返回的grpc中的codeDetail      string     // 返回的信息ID          int32      // 订单IDOrderAmount float32    // 订单总金额Ctx         context.Context
}

执行本地事务方法:

  1. 从购物车中获取到选中的商品

  2. 商品价格查询 - 访问商品服务 (跨微服务)

  3. 库存扣减 - 访问库存服务 (跨微服务)。扣减库存之前,如果出错则回滚消息撤销库存归还;扣减库存之后,如果出错,则提交消息进行库存归还

  4. 订单的基本信息表订单的商品信息表 中插入数据

  5. 从购物车中删除已购买的记录

  6. 发送延时消息:每创建一个订单,都将该订单信息作为延时消息发送出去,指定topic为订单超时(本意:假设订单30分钟后超时,那么创建一个订单30分钟后,就该对此订单进行检查,判断此订单是否已支付,防止用户下单后一直不支付占用库存)

func (o *OrderListener) ExecuteLocalTransaction(msg *primitive.Message) primitive.LocalTransactionState {/*新建订单时的本地事务1. 从购物车中获取到选中的商品2. 商品价格查询 - 访问商品服务 (跨微服务)3. 库存扣减 - 访问库存服务 (跨微服务)4. 向 订单的基本信息表 和 订单的商品信息表 中插入数据5. 从购物车中删除已购买的记录*/var orderInfo model.OrderInfo// 将订单信息序列化到结构体中_ = json.Unmarshal(msg.Body, &orderInfo)parentSpan := opentracing.SpanFromContext(o.Ctx)var goodsIds []int32                  // 购物车中选中的商品id集合(为了通过id集合获取商品信息)var shopCarts []model.ShoppingCart    // 购物车中该用户选中的商品集合goodsNumsMap := make(map[int32]int32) // 购物车种该用过户选中的商品id和商品数量shopCartSpan := opentracing.GlobalTracer().StartSpan("select_shopcart", opentracing.ChildOf(parentSpan.Context()))// 1.去购物车表中查找,该用户选中的商品集合if result := global.DB.Where(&model.ShoppingCart{User: orderInfo.User, Checked: true}).Find(&shopCarts); result.RowsAffected == 0 {// 如果没有选中结算的商品,则返回错误信息o.Code = codes.InvalidArgumento.Detail = "没有选中结算的商品"// 这时本地事务出错,但还没有进行库存扣减,所以不需要调库存归还的消息,所以回滚消息return primitive.RollbackMessageState}shopCartSpan.Finish()// 2.封装商品id集合,和 商品id:商品数量 的mapfor _, shopCart := range shopCarts {goodsIds = append(goodsIds, shopCart.Goods)goodsNumsMap[shopCart.Goods] = shopCart.Nums}queryGoodsSpan := opentracing.GlobalTracer().StartSpan("query_goods", opentracing.ChildOf(parentSpan.Context()))// 3.调用商品微服务,批量获取商品信息(根据商品id集合)goods, err := global.GoodsSrvClient.BatchGetGoods(context.Background(), &proto.BatchGoodsIdInfo{Id: goodsIds})if err != nil {o.Code = codes.Internalo.Detail = "批量查询商品信息失败"// 这时本地事务出错,但还没有进行库存扣减,所以不需要调库存归还的消息,所以回滚消息return primitive.RollbackMessageState}queryGoodsSpan.Finish()var orderAmount float32                // 订单总金额var orderGoods []*model.OrderGoods     // 订单商品详情集合var goodsInvInfo []*proto.GoodsInvInfo // 订单商品数量集合// 4.遍历商品信息集合,封装 订单总金额 和 订单商品详情集合 和 订单商品数量集合for _, good := range goods.Data {// 订单总金额(封装到订单详情中,用于向订单表中插入数据):商品单价*商品数量orderAmount += good.ShopPrice * float32(goodsNumsMap[good.Id])// 订单中的商品详情集合(用于向订单商品表中插入数据)(这里没有填充Order订单号字段,后续填充)orderGoods = append(orderGoods, &model.OrderGoods{Goods:      good.Id,GoodsName:  good.Name,GoodsImage: good.GoodsFrontImage,GoodsPrice: good.ShopPrice,Nums:       goodsNumsMap[good.Id],})// 订单中的商品数量集合(用于扣减库存)goodsInvInfo = append(goodsInvInfo, &proto.GoodsInvInfo{GoodsId: good.Id,Num:     goodsNumsMap[good.Id],})}queryInvSpan := opentracing.GlobalTracer().StartSpan("query_inv", opentracing.ChildOf(parentSpan.Context()))// 5.调用库存服务,扣减库存(根据订单号和商品数量详情集合)if _, err = global.InventorySrvClient.Sell(context.Background(), &proto.SellInfo{OrderSn: orderInfo.OrderSn, GoodsInfo: goodsInvInfo}); err != nil {// 对错误进行判断处理st, _ := status.FromError(err)// 如果是业务逻辑错误if st.Code() == codes.InvalidArgument || st.Code() == codes.Internal || st.Code() == codes.ResourceExhausted {o.Code = st.Code()o.Detail = "扣减库存失败"// 这时扣减库存服务出错,库存服务内部进行数据库事务回滚,所以还没有进行库存扣减,不需要调库存归还的消息,所以回滚消息return primitive.RollbackMessageState}// TODO如果是网络问题(实际调用成功,但是因为网络问题返回失败)}// 运行到这里,库存已经扣减成功。这里是分界线,分界线前执行失败则回滚事务消息,分界线后执行失败则提交事务消息queryInvSpan.Finish()// 6.向订单表中插入订单// 开启数据库事务tx := global.DB.Begin()orderInfo.OrderMount = orderAmountsaveOrderSpan := opentracing.GlobalTracer().StartSpan("save_order", opentracing.ChildOf(parentSpan.Context()))if result := tx.Save(&orderInfo); result.RowsAffected == 0 {// 数据库事务回滚tx.Rollback()o.Code = codes.Internalo.Detail = "创建订单失败"// 提交消息,进行库存归还return primitive.CommitMessageState}saveOrderSpan.Finish()o.OrderAmount = orderAmounto.ID = orderInfo.ID// 7.遍历订单中的商品详情集合(填充orderGoods中的Order订单号字段)for _, orderGood := range orderGoods {orderGood.Order = orderInfo.ID}saveOrderGoodsSpan := opentracing.GlobalTracer().StartSpan("save_order_goods", opentracing.ChildOf(parentSpan.Context()))// 8.向订单商品表中批量插入订单商品集合if result := tx.CreateInBatches(orderGoods, 100); result.RowsAffected == 0 {tx.Rollback() // 回滚事务o.Code = codes.Internalo.Detail = "批量插入订单商品失败"// 提交消息,进行库存归还return primitive.CommitMessageState}saveOrderGoodsSpan.Finish()// 9.删除购物车中选中的商品deleteShopCartSpan := opentracing.GlobalTracer().StartSpan("delete_shopcart", opentracing.ChildOf(parentSpan.Context()))if result := tx.Where(&model.ShoppingCart{User: orderInfo.User, Checked: true}).Delete(&model.ShoppingCart{}); result.RowsAffected == 0 {tx.Rollback() // 回滚事务o.Code = codes.Internalo.Detail = "删除购物车记录失败"// 提交消息,进行库存归还return primitive.CommitMessageState}deleteShopCartSpan.Finish()// 10.发送延时消息p, err := rocketmq.NewProducer(producer.WithNameServer([]string{"192.168.26.2:9876"}))if err != nil {panic("生成producer失败")}// 不要在一个进程中使用多个producer,不要随便调用shutdown因为会影响其他的producerif err = p.Start(); err != nil {panic("启动producer失败")}// 向order_timeout订单超时的topic中发送延时消息(订单信息)msg = primitive.NewMessage("order_timeout", msg.Body)msg.WithDelayTimeLevel(3) // 延时10秒(上线后设置为30分钟即level16)_, err = p.SendSync(context.Background(), msg)if err != nil {zap.S().Errorf("发送延时消息失败: %v\n", err)tx.Rollback() // 回滚事务o.Code = codes.Internalo.Detail = "发送延时消息失败"return primitive.CommitMessageState}// 11.提交事务// 先提交数据库事务tx.Commit()o.Code = codes.OK// 本地事务执行成功,回滚事务消息return primitive.RollbackMessageState
}

检查本地事务方法:

查找订单表中是否有此订单,如果订单表中没有此订单,说明本地事务执行失败,所以直接提交消息;如果存在此订单,说明本地事务执行成功,所以回滚消息(缺陷:订单表中没有此订单,不代表本地事务已经将库存扣减了,可能还没扣减前就已经出错了,这是提交消息会造成库存多扣减(没有保证幂等性))

func (o *OrderListener) CheckLocalTransaction(msg *primitive.MessageExt) primitive.LocalTransactionState {var orderInfo model.OrderInfo_ = json.Unmarshal(msg.Body, &orderInfo)// 查找订单表if result := global.DB.Where(model.OrderInfo{OrderSn: orderInfo.OrderSn}).First(&orderInfo); result.RowsAffected == 0 {// 如果订单表中没有此订单,说明本地事务执行失败,所以直接提交消息// TODO缺陷:订单表中没有此订单,不代表本地事务已经将库存扣减了,可能还没扣减前就已经出错了,这是提交消息会造成库存多扣减(没有保证幂等性)return primitive.CommitMessageState // 不能说明这里就是库存已经扣减了}// 如果存在此订单,说明本地事务执行成功,所以回滚消息return primitive.RollbackMessageState
}

main函数:

创建一个消费者,订阅topic为订单超时,指定处理函数为OrderTimeout

// 创建一个RocketMQ消费者,处理订单超时的消息c, _ := rocketmq.NewPushConsumer(consumer.WithNameServer([]string{"192.168.26.2:9876"}),consumer.WithGroupName("mxshop-order"),)// 订阅了名为 "order_timeout" 的主题,使用了默认的消息选择器,指定了 handler.OrderTimeout 作为消息处理函数,if err := c.Subscribe("order_timeout", consumer.MessageSelector{}, handler.OrderTimeout); err != nil {fmt.Println("读取消息失败")}_ = c.Start()

OrderTimeout函数:

  1. 逐个订单进行处理,先查询订单的支付状态,如果是“交易成功”则无需处理
  2. 如果不是“交易成功”,说明订单没有支付,但是已经超时,所以进行库存归还
  3. 将订单状态修改为“交易结束”
  4. 更新订单表中此订单的订单状态
  5. 创建一个RocketMQ生产者,向topic库存归还发送普通消息
func OrderTimeout(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {// 遍历所有消息中的订单for i := range msgs {// 将订单信息反序列化var orderInfo model.OrderInfo_ = json.Unmarshal(msgs[i].Body, &orderInfo)fmt.Printf("获取到订单超时消息: %v\n", time.Now())// 查询订单的支付状态var order model.OrderInfoif result := global.DB.Model(model.OrderInfo{}).Where(model.OrderInfo{OrderSn: orderInfo.OrderSn}).First(&order); result.RowsAffected == 0 {// 如果订单表中没有此订单,则无需操作return consumer.ConsumeSuccess, nil}// 如果订单表中有此订单if order.Status != "TRADE_SUCCESS" {// 如果订单的状态不是交易成功,说明订单没有支付,但是已经超时,所以进行库存归还tx := global.DB.Begin()// 修改订单的状态为交易结束order.Status = "TRADE_CLOSED"// 更新订单表中此订单的订单状态tx.Save(&order)// 创建一个生产者p, err := rocketmq.NewProducer(producer.WithNameServer([]string{"192.168.26.2:9876"}))if err != nil {panic("生成producer失败")}if err = p.Start(); err != nil {panic("启动producer失败")}// 发送消息:将此订单消息发送出去进行库存归还,指定topic为库存归还// 在inventory_srv/main.go中创建消费者,从此topic中消费消息,进行库存归还_, err = p.SendSync(context.Background(), primitive.NewMessage("order_reback", msgs[i].Body))if err != nil {// 回滚数据库事务tx.Rollback()fmt.Printf("发送失败: %s\n", err)return consumer.ConsumeRetryLater, nil}return consumer.ConsumeSuccess, nil}}// 如果订单的状态是交易成功,则不用库存归还return consumer.ConsumeSuccess, nil
}

库存服务

main函数:

创建一个消费者,订阅topic为库存归还,指定处理函数为AutoReback

// 创建一个消费者c, _ := rocketmq.NewPushConsumer(consumer.WithNameServer([]string{"192.168.26.2:9876"}),consumer.WithGroupName("mxshop-inventory"),)// 订阅了名为 "order_reback" 的主题,使用了默认的消息选择器,指定了 handler.AutoReback 作为消息处理函数if err := c.Subscribe("order_reback", consumer.MessageSelector{}, handler.AutoReback); err != nil {fmt.Println("读取消息失败")}_ = c.Start()

AutoReback函数:

逐个订单进行处理,将库存进行归还并将库存状态设置为2已归还

func AutoReback(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {type OrderInfo struct {OrderSn string // 订单号}// 遍历消息,需要库存归还的订单for i := range msgs {// 重复归还的问题,接口应该确保幂等性,不能因为消息的重复发送导致一个订单的库存归还多次,没有扣减的库存不归还// 解决方式:新建一张表,这张表记录了详细的订单扣减细节,以及归还细节var orderInfo OrderInfoerr := json.Unmarshal(msgs[i].Body, &orderInfo)if err != nil {zap.S().Errorf("解析json失败: %v\n", msgs[i].Body)return consumer.ConsumeSuccess, nil}// 开启数据库事务tx := global.DB.Begin()var sellDetail model.StockSellDetail// 查询StockSellDetail表中,此订单号并且库存状态为1已扣减的订单if result := tx.Model(&model.StockSellDetail{}).Where(&model.StockSellDetail{OrderSn: orderInfo.OrderSn, Status: 1}).First(&sellDetail); result.RowsAffected == 0 {// 如果未查询到,说明库存已归还return consumer.ConsumeSuccess, nil}// 如果查询到则逐个商品归还库存for _, orderGood := range sellDetail.Detail {// sql:update stocks set stocks = stocks + orderGood.Num where goods = orderGood.Goodsif result := tx.Model(&model.Inventory{}).Where(&model.Inventory{Goods: orderGood.Goods}).Update("stocks", gorm.Expr("stocks+?", orderGood.Num)); result.RowsAffected == 0 {tx.Rollback()return consumer.ConsumeRetryLater, nil}}// 更新StockSellDetail表中,此订单号的库存状态为2已归还if result := tx.Model(&model.StockSellDetail{}).Where(&model.StockSellDetail{OrderSn: orderInfo.OrderSn}).Update("status", 2); result.RowsAffected == 0 {tx.Rollback()return consumer.ConsumeRetryLater, nil}tx.Commit()return consumer.ConsumeSuccess, nil}return consumer.ConsumeSuccess, nil
}// 确认扣减库存
func (*InventoryServer) ConfirmSell(ctx context.Context, req *proto.SellInfo) (*emptypb.Empty, error) {client := goredislib.NewClient(&goredislib.Options{Addr: "192.168.26.2:6379",})pool := goredis.NewPool(client)rs := redsync.New(pool)tx := global.DB.Begin()for _, goodInfo := range req.GoodsInfo {var inv model.InventoryNew//if result := tx.Clauses(clause.Locking{Strength: "UPDATE"}).Where(&model.Inventory{Goods:goodInfo.GoodsId}).First(&inv); result.RowsAffected == 0 {//	tx.Rollback() //回滚之前的操作//	return nil, status.Errorf(codes.InvalidArgument, "没有库存信息")//}//for {mutex := rs.NewMutex(fmt.Sprintf("goods_%d", goodInfo.GoodsId))if err := mutex.Lock(); err != nil {return nil, status.Errorf(codes.Internal, "获取redis分布式锁异常")}if result := global.DB.Where(&model.Inventory{Goods: goodInfo.GoodsId}).First(&inv); result.RowsAffected == 0 {tx.Rollback() //回滚之前的操作return nil, status.Errorf(codes.InvalidArgument, "没有库存信息")}//判断库存是否充足if inv.Stocks < goodInfo.Num {tx.Rollback() //回滚之前的操作return nil, status.Errorf(codes.ResourceExhausted, "库存不足")}//扣减, 会出现数据不一致的问题 - 锁,分布式锁inv.Stocks -= goodInfo.Num // 对库存进行扣减inv.Freeze -= goodInfo.Num // 对冻结库存进行恢复tx.Save(&inv)if ok, err := mutex.Unlock(); !ok || err != nil {return nil, status.Errorf(codes.Internal, "释放redis分布式锁异常")}}tx.Commit() // 需要自己手动提交操作//m.Unlock() //释放锁return &emptypb.Empty{}, nil
}

相关文章:

RocketMQ实现基于可靠消息的最终一致性

RocketMQ实现基于可靠消息的最终一致性 文章目录 RocketMQ实现基于可靠消息的最终一致性一、RocketMQ应用场景**应用解耦****流量削峰****数据分发** 二、RocketMQ 基础概念1. 核心组件2. 消费模式3. 消息可靠性 三、消息类型按发送方式分同步发送异步发送单向发送 按使用功能特…...

【题解-Acwing】790. 数的三次方根

题目:790. 数的三次方根 题目描述 给定一个浮点数 n,求它的三次方根。 输入 共一行,包含一个浮点数 n 。 输出 共一行,包含一个浮点数,表示问题的解。 注意,结果保留 6 位小数。 数据范围 −10000 ≤ n ≤ 10000 时空限制 1s / 64MB 输入样例 1000.00输出样…...

一键升级OpenSSH/OpenSSL修复安全漏洞

在服务器安全运维过程中&#xff0c;我们经常面临这样的问题&#xff1a;收到高危漏洞通报&#xff08;如最近的OpenSSH多个CVE漏洞&#xff09;&#xff0c;但Ubuntu系统无法通过apt直接升级到修复版本。这种情况下&#xff0c;传统方法需要手动编译源码&#xff0c;处理依赖关…...

Pycharm 如何删除某个 Python Interpreter

在PyCharm中&#xff0c;点击右下角的“Interpreter Settings”按钮&#xff0c;或者通过菜单栏选择“File” > “Settings”&#xff08;macOS用户选择“PyCharm” > “Preferences”&#xff09;。在设置窗口中&#xff0c;导航到“Project: [Your Project Name]” >…...

【C++】深入浅出之多态

目录 多态的概念多态的定义和实现多态的构造条件虚函数虚函数的重写虚函数重写的两个例外协变析构函数作为虚函数重写 C11的override和final重载、重写(覆盖)、隐藏(重定义)的对比相关面试题⭐ 抽象类概念接口继承和实现继承 多态的原理虚函数表多态的原理动态绑定和静态绑定 e…...

精益数据分析(9/126):如何筛选创业路上的关键数据指标

精益数据分析&#xff08;9/126&#xff09;&#xff1a;如何筛选创业路上的关键数据指标 大家好&#xff01;在创业的漫漫长路中&#xff0c;数据就像一盏明灯&#xff0c;指引着我们前行的方向。但要让这盏灯发挥作用&#xff0c;关键在于找到那些真正有价值的数据指标。今天…...

【Python爬虫详解】第二篇:HTML结构的基本分析

在上一篇文章中&#xff0c;我们介绍了网络爬虫的基本概念、发展历程和工作原理。要进行有效的网页内容爬取&#xff0c;首先需要理解我们要爬取的对象 —— 网页的基本结构和语法。网页本质上是由HTML代码构成的&#xff0c;爬虫程序需要从HTML中提取我们需要的信息。因此&…...

【C++】 —— 笔试刷题day_21

一、爱丽丝的人偶 题目解析 现在存在n个玩偶&#xff0c;每个玩偶的身高是1、2、3......n&#xff1b; 现在我们要对这些玩偶进行排序&#xff08;如果x人偶&#xff0c;它左右两边的玩偶一个比x高、一个比x矮&#xff0c;那这个玩偶就会爆炸&#xff09;。 我们不想要任何一个…...

云点数据读写

一、常见点云数据格式 LAS/LAZ格式 LAS是点云数据的行业标准格式 LAZ是LAS的压缩版本 支持地理参考信息、颜色、强度等属性 PCD格式(Point Cloud Data) PCL(Point Cloud Library)开发的格式 支持ASCII和二进制存储 包含头部信息和数据部分 PLY格式(Polygon File Format…...

Matlab 汽车行驶速度PID控制系统仿真

1、内容简介 Matlab 213-汽车行驶速度PID控制系统仿真 可以交流、咨询、答疑 2、内容说明 略 3、仿真分析 略 4、参考论文 略...

STM32嵌入式

一、创建工程项目 1、进入软件首页 2、新建项目,【file】->【new project】 3、选择需要的芯片 4、系统内核部分设置 ① 选择晶振&#xff08;使用外部的高速晶振&#xff09; ② 选择debug形式&#xff08;SW类型&#xff09; 5、时钟设置 6、选择自己需要的引脚设置&a…...

机器学习(神经网络基础篇)——个人理解篇6(概念+代码)

1 在声明一个类中&#xff0c;构建一个属于类的函数&#xff0c;前面为什要加上“self”&#xff1f; 就像下面这一串代码&#xff1a; class TwoLayerNet:def __init__(self, input_size, hidden_size, output_size,weight_init_std0.01):# 初始化权重self.params {}self.p…...

Java学习手册:Filter 和 Listener

在 JavaWeb 开发中&#xff0c;Filter&#xff08;过滤器&#xff09;和 Listener&#xff08;监听器&#xff09;是两个重要的技术组件&#xff0c;它们在处理客户端请求、管理应用状态和资源以及实现全局逻辑控制等方面发挥着关键作用。 一、Filter&#xff08;过滤器&#…...

深度学习总结(25)

抽样偏倚问题 非代表性数据有一个特别隐蔽又特别常见的例子&#xff0c;那就是抽样偏倚&#xff08;sampling bias&#xff09;​。如果你的数据收集过程与你尝试预测的目标之间存在相互影响&#xff0c;就会出现抽样偏倚&#xff0c;从而导致有偏差的结果。 理解数据 将数据…...

探索 Model Context Protocol (MCP):它如何影响 AI 的表现?

Anthropic 公司 Anthropic 是一家技术实力雄厚的公司&#xff0c;也是大模型领域的重要参与者之一。其开发的 **Claude 模型** 是全球首个以编程能力见长并广受欢迎的大语言模型。这款模型凭借卓越的代码生成和理解能力&#xff0c;迅速成为许多开发者工具的核心组件。例如&am…...

Three.js + React 实战系列-3D 个人主页 :完成 Navbar 导航栏组件

在上一节中&#xff0c;我们搭建了项目的基础结构&#xff0c;搭建好了项目框架。 本节我们将继续完善页面结构&#xff0c;完成第一个视觉组件 —— Navbar 导航栏 ✅ 前置准备&#xff1a; ✅下载静态资源在根目录下 (src 同级)谷歌云盘地址 &#x1f3a5; 02 完成 Navba…...

游戏引擎学习第238天:让 OpenGL 使用我们的屏幕坐标

回顾并为今天的内容做准备 我们已经完成了硬件显示的实现&#xff0c;现在通过GPU来显示游戏。原本以为这会花费很长时间&#xff0c;但结果实际所需的时间并不多。因此&#xff0c;我们现在有了进展&#xff0c;但接下来应该做什么还不确定。虽然有很多事情可以做&#xff0c…...

go+mysql+cocos实现游戏搭建

盲目的学了一段时间了&#xff0c;刚开始从Box2d开始学习&#xff0c;明白了很多&#xff0c;Box2d是物理模型的基础&#xff0c;是我们在游戏中模拟现实的很重要的一个开源工具。后来在朋友的建议下学习了cocos&#xff0c;也是小程序开发的利器&#xff0c;而golang是一款高效…...

Linux 网络基础(二) (传输协议层:UDP、TCP)

目录 一、传输层的意义 二、端口号 1、五元组标识一个通信 2、端口号范围划分 3、知名端口号&#xff08;Well-Know Port Number&#xff09; &#xff08;1&#xff09;查看端口号 4、绑定端口号数目问题 5、pidof & netstat 命令 &#xff08;1&#xff09;ne…...

Vue常用指令入门

1. v-for 作用&#xff1a;用于遍历对象或数组 注意&#xff1a;需要提供key属性&#xff0c;可以提高性能和避免渲染错误&#xff0c;值通常为index或item.id <li v-for"(item, index) in items" :key"index">{{ item }} </li>2. v-if,v-el…...

【文献阅读】EndoNet A Deep Architecture for Recognition Tasks on Laparoscopic Videos

关于数据集的整理 Cholec80 胆囊切除手术视频数据集介绍 https://zhuanlan.zhihu.com/p/700024359 数据集信息 Cholec80 数据集 是一个针对内窥镜引导 下的胆囊切除手术视频流程识别数据集。数据集提供了每段视频中总共7种手术动作及总共7种手术工具的标注&#xff0c;标…...

UML统一建模

UML UML&#xff08;统一建模语言&#xff09;介绍 UML&#xff08;统一建模语言&#xff09;介绍 面向对象软件开发需要经过OOA面向对象分析、OOD面向对象设计和OOP面向对象编程三个阶段。OOA对目标系统进行分析并寄哪里分析模型&#xff0c;并将之文档化&#xff0c;OOD用面向…...

Ubuntu下安装和卸载MySQL

Ubuntu下安装和卸载MySQL 下面的演示系统版本&#xff1a;Ubuntu 24.04 更新系统软件包 在开始安装之前&#xff0c;建议先更新系统的软件包列表&#xff0c;以确保所有依赖项是最新的。 sudo apt update && sudo apt upgrade -y安装MySQL服务器 Ubuntu的官方软件…...

物联网技术赋能:复杂环境下的能源数据零丢失

安科瑞顾强 在全球能源挑战日益严峻的背景下&#xff0c;高效节能已成为各行业的核心诉求。无论是商业综合体、工业厂房还是公共设施&#xff0c;如何实现能源的精细化管理成为关键课题。安科瑞能耗云平台凭借其创新技术与多功能服务&#xff0c;为企业提供了一站式能源管理解…...

卷积神经网络综述

摘要 本文对卷积神经网络&#xff08;Convolutional Neural Network&#xff0c;CNN&#xff09;进行了全面综述。首先介绍了卷积神经网络的发展历程&#xff0c;包括早期的理论基础和关键突破。接着详细阐述了卷积神经网络的结构组成&#xff0c;包括卷积层、池化层、全连接层…...

SpringBoot3设置maven package直接打包成二进制可执行文件

注意事项 SpringBoot普通native打包顺序clean compile spring-boot:process-aot native:compile 使用以下配置只会的打包顺序clean package&#xff08;注意&#xff1a;使用此配置以后打包会有编译后的class文件、jar包、original源文件、二进制可执行文件【Linux是无后缀的包…...

在 Anaconda 上安装多版本 Python 解释器并在 PyCharm 中配置

默认已安装好 Anaconda 和 PyCharm &#xff0c;想在 Anaconda 上安装最新版本的 Python 解释器。 一、在 Anaconda 上创建虚拟环境 在连网状态下进入系统的命令提示符&#xff08;快捷键&#xff1a;win r &#xff0c;输入 cmd 即可&#xff09;&#xff0c;输入如下命令&a…...

AES (高级加密标准)

原理详解 AES是一种对称加密算法&#xff0c;使用相同的密钥进行加密和解密。它采用替代-置换网络(SPN)结构&#xff0c;主要步骤包括&#xff1a; 密钥扩展&#xff1a;从初始密钥派生多轮密钥 初始轮&#xff1a;AddRoundKey&#xff08;轮密钥加&#xff09; 主轮&#xff…...

Git拉分支技巧:从零开始创建并推送分支

Git拉分支技巧&#xff1a;从零开始创建并推送分支 在团队协作开发中&#xff0c;Git 分支管理是不可或缺的技能。合理地创建、同步和推送分支&#xff0c;不仅能提高开发效率&#xff0c;还能避免代码冲突。本文将基于以下技巧&#xff0c;详细讲解如何从零开始创建并推送一个…...

线性回归之归一化(normalization)

文章目录 归一化与梯度下降归一化的必要性&#xff1a;从特征量纲到梯度下降问题背景矛盾与低效归一化的作用 归一化提高模型精度的原因归一化的本质常见归一化方法最大值最小值归一化示例说明优缺点分析 标准归一化具体机制示例说明 强调 归一化与梯度下降 归一化与梯度下降 &…...

mac监控linux上mysql性能(Grafana+Prometheus+mysqld_exporter)

一、监控查看端安装 Grafana安装-CSDN博客 普罗米修斯Prometheus监控安装&#xff08;mac&#xff09;-CSDN博客 1.启动Grafana服务 brew services start grafana 打开浏览器输入http://localhost:3000进入grafana登录页面 &#xff08;默认用户名和密码都为admin,进入后…...

【玩泰山派】MISC(杂项)- linux桌面环境

文章目录 linux桌面环境linux四大桌面环境概述ubuntu基于四大桌面环境的版本 显示管理器gdm3&#xff08;GNOME Display Manager&#xff09;lightdm&#xff08;Lightweight Display Manager&#xff09;SDDM&#xff08;Simple Desktop Display Manager&#xff09;KDM&#…...

MVCC介绍

MVCC&#xff08;多版本并发控制&#xff09;详解 MVCC&#xff08;Multi-Version Concurrency Control&#xff09; 是一种数据库并发控制技术&#xff0c;核心思想是通过维护数据的多个版本来实现读写操作的无锁并发&#xff0c;从而在高并发场景下提升性能。它广泛用于 MyS…...

神经网络与模型训练过程笔记

1.专有名词 ANN 人工神经网络&#xff0c;一种受生物神经元启发的监督学习算法。输入数据通过网络中的层级函数传递&#xff0c;激活特定神经元。函数复杂度越高&#xff0c;模型对数据的拟合能力越强&#xff0c;预测精度越高。 偏置项 其中x下表从1开始的是输入变量&#xf…...

ASP.NET 0~1学习

变量 string username Request["id"]; 声明并初始化一个字符串变量 username 数据类型 下面列出了常用的数据类型&#xff1a; 类型描述实例int整数&#xff08;全数字&#xff09;103, 12, 5168float浮点数3.14, 3.4e38decimal十进制数字&#xff08;高精度&a…...

optool为macho文件增加动态库

对macho文件有一定理解后,了解下optool是如何给macho文件增加动态库等功能的 optool 源码 环境 macOS 13.4 (22F66) Xcode 14.3.1 0x0 编译 下载源码 $ git clone --recurse-submodules https://github.com/alexzielenski/optool.git修改下Deployment Target,比如改成11.0&…...

【C++】类和对象之日期类的实现(构造、运算符重载)

文章目录 一、日期类要实现的函数二、函数实现1、GetMonthDay获取对应月份的天数2、CheckDate检查日期3、Date构造函数4、Print打印日期5、运算符重载1. 、、-、-2. 前置/--、后置/--3. 两个日期类相减&#xff08;求相差日期&#xff09; 6、比较7、流插入、流提取&#xff0…...

【Rust 精进之路之第9篇-所有权·核心】规则与移动 (Move):Rust 内存安全基石详解

系列: Rust 精进之路:构建可靠、高效软件的底层逻辑 作者: 码觉客 发布日期: 2025年4月20日 引言:没有 GC,Rust 如何管好内存?答案是所有权! 在我们的 Rust 探索之旅中,我们已经学习了变量、数据类型、控制流、函数和强大的构建工具 Cargo。现在,我们将踏入 Rust 最…...

【任务调度】xxl-job入门

xxl- job 入门 附上笔者写的测视示例&#xff1a;chenmeng-test-demos/demo8-task/task-xxl-job at master cmty256/chenmeng-test-demos 官方文档 XXL-JOB官网 源码仓库地址&#xff1a; Github&#xff1a;https://github.com/xuxueli/xxl-job Gitee&#xff1a;http://g…...

Go语言--语法基础4--基本数据类型--浮点数类型

3 、浮点数类型 浮点型用于表示包含小数点的数据&#xff0c;比如 1.234 就是一个浮点型数据。 Go 语言中的浮点类型采用 IEEE-754 标准的表达方式。 float32 精度是小数点后 7 位 float64 精度是小数点后 15 位。 1. 浮点数表示 Go 语言定义了两个类型 float32 和 floa…...

秘密任务 3.0:如何通过 JWT 认证确保 WebSockets 安全

在之前的文章中&#xff0c;我们探讨了如何通过 WebSockets DTOs 设计实时操作。现在&#xff0c;我们迎来了一项新的挑战&#xff1a;确保 WebSocket 通信在任务执行过程中保持安全。如果敌方潜伏在我们的实时通信渠道中&#xff0c;机密情报可能会被泄露。 任务&#xff1a…...

UID和GID的区别

UID&#xff08;用户标识符&#xff09;和 GID&#xff08;组标识符&#xff09;是 Linux/Unix 系统中用于管理用户和组权限的核心机制&#xff0c;它们的区别主要体现在作用对象和用途上&#xff1a; 目录 1. 定义与作用对象 2. 主要用途 3. 系统保留范围 4. 用户与组的关…...

【网络】通过Samba实现Window挂在Linux服务器路径

有时候我们去进行内网部署时&#xff0c;会遇到客户或者甲方爸爸说&#xff0c;需要将Linux中的某个路径共享出去到Window上&#xff0c;挂载出比如Z:\这种盘符。通过打开Z盘&#xff0c;来查看服务器的指定目录下的数据。 步骤1&#xff1a; 在Linux中安装samba yum install…...

UE5 UI 教程系列全集

https://www.youtube.com/TheRoyalSkies/search?queryUnreal-5%20UI...

论文笔记(七十八)Do generative video models understand physical principles?

Do generative video models understand physical principles? 文章概括Physics-IQ基准数据集评估协议为什么要创建一个真实世界的Physics-IQ数据集模型物理理解的评估指标动作发生在哪里&#xff1f;空间IoU&#xff08;Spatial IoU&#xff09;动作在哪里、何时发生&#xf…...

Viper配置管理笔记

一、什么是 Viper&#xff1f; Viper 是 Go 语言的一个强大工具&#xff0c;就像一个超级管家&#xff0c;专门负责帮你打理程序的各种配置。它能把配置文件&#xff08;比如 JSON、YAML、TOML 等格式&#xff09;里的内容读出来&#xff0c;还能监控配置文件的变化&#xff0…...

visual studio无法跳转到函数定义、变量定义、跳转函数位置不准问题解决

参考&#xff1a;https://blog.csdn.net/snakehacker/article/details/135438353 程序有时会出现大部分函数都不能准确的从头文件中正确定位到函数定位,这是因为数据库错乱造成的,可以通过重构数据库来解决,操作方法如下&#xff1a; 菜单栏&#xff1a;工具——选项 文本编辑…...

【Rust 精进之路之第15篇-枚举 Enum】定义、变体与数据关联:表达多种可能性

系列: Rust 精进之路:构建可靠、高效软件的底层逻辑 作者: 码觉客 发布日期: 2025年4月20日 引言:当值拥有“选项”——超越结构体的表达力 在上一篇【结构体 Struct】中,我们学习了如何使用结构体将多个相关的数据字段组合成一个有意义的整体。结构体非常适合表示那些…...

C++ 相关系统软件简介与学习方法【最水的一期】

C 作为一种强大的C 相关系统软件简介编程语言&#xff0c;广泛应用于系统软件开发领域。以下为你介绍几款基于 C 开发的典型系统软件及其特点&#xff1a; 操作系统内核 部分操作系统内核采用 C 开发&#xff0c;例如某些嵌入式操作系统。C 的高性能、底层硬件访问能力和强大的…...

【Linux我做主】GDB调试工具完全指南

Linux下GDB调试工具完全指南&#xff1a;25个核心命令详解与实战示例 github地址 有梦想的电信狗 前言 GDB&#xff08;GNU Debugger&#xff09;是Linux开发中不可或缺的调试工具&#xff0c;尤其在定位代码逻辑错误和内存问题时表现卓越。本文基于实际开发经验&#xff0…...