Seata源码—5.全局事务的创建与返回处理二
大纲
1.Seata开启分布式事务的流程总结
2.Seata生成全局事务ID的雪花算法源码
3.生成xid以及对全局事务会话进行持久化的源码
4.全局事务会话数据持久化的实现源码
5.Seata Server创建全局事务与返回xid的源码
6.Client获取Server的响应与处理的源码
7.Seata与Dubbo整合的过滤器源码
5.Seata Server创建全局事务与返回xid的源码
-> ServerHandler.channelRead()接收Seata Client发送过来的请求;
-> AbstractNettyRemoting.processMessage()处理RpcMessage消息;
-> ServerOnRequestProcessor.process()处理RpcMessage消息;
-> TransactionMessageHandler.onRequest()处理RpcMessage消息;
-> RemotingServer.sendAsyncResponse()返回包含xid的响应给客户端;
public abstract class AbstractNettyRemotingServer extends AbstractNettyRemoting implements RemotingServer {...@ChannelHandler.Sharableclass ServerHandler extends ChannelDuplexHandler {@Overridepublic void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {if (!(msg instanceof RpcMessage)) {return;}//接下来调用processMessage()方法对解码完毕的RpcMessage对象进行处理processMessage(ctx, (RpcMessage) msg);}}
}public abstract class AbstractNettyRemoting implements Disposable {...protected void processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {if (LOGGER.isDebugEnabled()) {LOGGER.debug(String.format("%s msgId:%s, body:%s", this, rpcMessage.getId(), rpcMessage.getBody()));}Object body = rpcMessage.getBody();if (body instanceof MessageTypeAware) {MessageTypeAware messageTypeAware = (MessageTypeAware) body;//根据消息类型获取到一个Pair对象,该Pair对象是由请求处理组件和请求处理线程池组成的//processorTable里的内容,是NettyRemotingServer在初始化时,通过调用registerProcessor()方法put进去的//所以下面的代码实际上会由ServerOnRequestProcessor的process()方法进行处理final Pair<RemotingProcessor, ExecutorService> pair = this.processorTable.get((int) messageTypeAware.getTypeCode());if (pair != null) {if (pair.getSecond() != null) {try {pair.getSecond().execute(() -> {try {pair.getFirst().process(ctx, rpcMessage);} catch (Throwable th) {LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);} finally {MDC.clear();}});} catch (RejectedExecutionException e) {...}} else {try {pair.getFirst().process(ctx, rpcMessage);} catch (Throwable th) {LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);}}} else {LOGGER.error("This message type [{}] has no processor.", messageTypeAware.getTypeCode());}} else {LOGGER.error("This rpcMessage body[{}] is not MessageTypeAware type.", body);}}...
}public class ServerOnRequestProcessor implements RemotingProcessor, Disposable {private final RemotingServer remotingServer;...@Overridepublic void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {if (ChannelManager.isRegistered(ctx.channel())) {onRequestMessage(ctx, rpcMessage);} else {try {if (LOGGER.isInfoEnabled()) {LOGGER.info("closeChannelHandlerContext channel:" + ctx.channel());}ctx.disconnect();ctx.close();} catch (Exception exx) {LOGGER.error(exx.getMessage());}if (LOGGER.isInfoEnabled()) {LOGGER.info(String.format("close a unhandled connection! [%s]", ctx.channel().toString()));}}}private void onRequestMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) {Object message = rpcMessage.getBody();//RpcContext线程本地变量副本RpcContext rpcContext = ChannelManager.getContextFromIdentified(ctx.channel());if (LOGGER.isDebugEnabled()) {LOGGER.debug("server received:{},clientIp:{},vgroup:{}", message, NetUtil.toIpAddress(ctx.channel().remoteAddress()), rpcContext.getTransactionServiceGroup());} else {try {BatchLogHandler.INSTANCE.getLogQueue().put(message + ",clientIp:" + NetUtil.toIpAddress(ctx.channel().remoteAddress()) + ",vgroup:" + rpcContext.getTransactionServiceGroup());} catch (InterruptedException e) {LOGGER.error("put message to logQueue error: {}", e.getMessage(), e);}}if (!(message instanceof AbstractMessage)) {return;}// the batch send request messageif (message instanceof MergedWarpMessage) {...} else {// the single send request messagefinal AbstractMessage msg = (AbstractMessage) message;//最终调用到DefaultCoordinator的onRequest()方法来处理RpcMessageAbstractResultMessage result = transactionMessageHandler.onRequest(msg, rpcContext);//返回响应给客户端remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), result);}}...
}
-> TransactionMessageHandler.onRequest()处理RpcMessage消息;
-> DefaultCoordinator.onRequest()处理RpcMessage消息;
-> GlobalBeginRequest.handle()处理开启全局事务请求;
-> AbstractTCInboundHandler.handle()开启全局事务返回全局事务;
-> DefaultCoordinator.doGlobalBegin()开启全局事务;
-> DefaultCore.begin()创建全局事务会话并开启;
-> GlobalSession.createGlobalSession()创建全局事务会话;
-> GlobalSession.begin()开启全局事务会话;
-> AbstractSessionManager.onBegin()
-> AbstractSessionManager.addGlobalSession()
-> AbstractSessionManager.writeSession()
-> TransactionStoreManager.writeSession()持久化全局事务会话;
public class DefaultCoordinator extends AbstractTCInboundHandler implements TransactionMessageHandler, Disposable {...@Overridepublic AbstractResultMessage onRequest(AbstractMessage request, RpcContext context) {if (!(request instanceof AbstractTransactionRequestToTC)) {throw new IllegalArgumentException();}AbstractTransactionRequestToTC transactionRequest = (AbstractTransactionRequestToTC) request;transactionRequest.setTCInboundHandler(this);return transactionRequest.handle(context);}...
}public class GlobalBeginRequest extends AbstractTransactionRequestToTC {...@Overridepublic AbstractTransactionResponse handle(RpcContext rpcContext) {return handler.handle(this, rpcContext);}...
}public abstract class AbstractTCInboundHandler extends AbstractExceptionHandler implements TCInboundHandler {private static final Logger LOGGER = LoggerFactory.getLogger(AbstractTCInboundHandler.class);@Overridepublic GlobalBeginResponse handle(GlobalBeginRequest request, final RpcContext rpcContext) {GlobalBeginResponse response = new GlobalBeginResponse();exceptionHandleTemplate(new AbstractCallback<GlobalBeginRequest, GlobalBeginResponse>() {@Overridepublic void execute(GlobalBeginRequest request, GlobalBeginResponse response) throws TransactionException {try {//开启全局事务doGlobalBegin(request, response, rpcContext);} catch (StoreException e) {throw new TransactionException(TransactionExceptionCode.FailedStore, String.format("begin global request failed. xid=%s, msg=%s", response.getXid(), e.getMessage()), e);}}}, request, response);return response;}...
}public class DefaultCoordinator extends AbstractTCInboundHandler implements TransactionMessageHandler, Disposable {private final DefaultCore core;...@Overrideprotected void doGlobalBegin(GlobalBeginRequest request, GlobalBeginResponse response, RpcContext rpcContext) throws TransactionException {//接下来才真正处理开启全局事务的业务逻辑//其中会调用DefaultCore来真正开启一个全局事务,即拿到xid并设置到响应里去response.setXid(core.begin(rpcContext.getApplicationId(),//应用程序idrpcContext.getTransactionServiceGroup(),//事务服务分组request.getTransactionName(),//事务名称request.getTimeout())//超时时间);if (LOGGER.isInfoEnabled()) {LOGGER.info("Begin new global transaction applicationId: {},transactionServiceGroup: {}, transactionName: {},timeout:{},xid:{}", rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(), request.getTransactionName(), request.getTimeout(), response.getXid());}}...
}public class DefaultCore implements Core {...@Overridepublic String begin(String applicationId, String transactionServiceGroup, String name, int timeout) throws TransactionException {//创建一个全局事务会话GlobalSession session = GlobalSession.createGlobalSession(applicationId, transactionServiceGroup, name, timeout);//通过slf4j的MDC把xid放入线程本地变量副本里去MDC.put(RootContext.MDC_KEY_XID, session.getXid());//添加一个全局事务会话的生命周期监听器session.addSessionLifecycleListener(SessionHolder.getRootSessionManager());//打开Session,其中会对全局事务会话进行持久化session.begin();//transaction start event,发布会话开启事件MetricsPublisher.postSessionDoingEvent(session, false);//返回全局事务会话的xidreturn session.getXid();}...
}public class GlobalSession implements SessionLifecycle, SessionStorable {...public static GlobalSession createGlobalSession(String applicationId, String txServiceGroup, String txName, int timeout) {GlobalSession session = new GlobalSession(applicationId, txServiceGroup, txName, timeout, false);return session;}public GlobalSession(String applicationId, String transactionServiceGroup, String transactionName, int timeout, boolean lazyLoadBranch) {//全局事务id是通过UUIDGenerator来生成的this.transactionId = UUIDGenerator.generateUUID();this.status = GlobalStatus.Begin;this.lazyLoadBranch = lazyLoadBranch;if (!lazyLoadBranch) {this.branchSessions = new ArrayList<>();}this.applicationId = applicationId;this.transactionServiceGroup = transactionServiceGroup;this.transactionName = transactionName;this.timeout = timeout;//根据UUIDGenerator生成的transactionId + XID工具生成最终的xidthis.xid = XID.generateXID(transactionId);}@Overridepublic void begin() throws TransactionException {this.status = GlobalStatus.Begin;this.beginTime = System.currentTimeMillis();this.active = true;for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {lifecycleListener.onBegin(this);}}...
}public abstract class AbstractSessionManager implements SessionManager, SessionLifecycleListener {...@Overridepublic void onBegin(GlobalSession globalSession) throws TransactionException {addGlobalSession(globalSession);}@Overridepublic void addGlobalSession(GlobalSession session) throws TransactionException {if (LOGGER.isDebugEnabled()) {LOGGER.debug("MANAGER[{}] SESSION[{}] {}", name, session, LogOperation.GLOBAL_ADD);}writeSession(LogOperation.GLOBAL_ADD, session);}private void writeSession(LogOperation logOperation, SessionStorable sessionStorable) throws TransactionException {//transactionStoreManager.writeSession()会对全局事务会话进行持久化if (!transactionStoreManager.writeSession(logOperation, sessionStorable)) {...}}...
}
-> RemotingServer.sendAsyncResponse()返回包含xid的响应给客户端;
-> AbstractNettyRemotingServer.sendAsyncResponse()异步发送响应;
-> AbstractNettyRemoting.buildResponseMessage()构造包含xid响应;
-> AbstractNettyRemoting.sendAsync()异步发送响应;
-> Netty的Channel.writeAndFlush()发送响应给客户端;
public abstract class AbstractNettyRemotingServer extends AbstractNettyRemoting implements RemotingServer {...@Overridepublic void sendAsyncResponse(RpcMessage rpcMessage, Channel channel, Object msg) {Channel clientChannel = channel;if (!(msg instanceof HeartbeatMessage)) {clientChannel = ChannelManager.getSameClientChannel(channel);}if (clientChannel != null) {RpcMessage rpcMsg = buildResponseMessage(rpcMessage, msg, msg instanceof HeartbeatMessage? ProtocolConstants.MSGTYPE_HEARTBEAT_RESPONSE: ProtocolConstants.MSGTYPE_RESPONSE);super.sendAsync(clientChannel, rpcMsg);} else {throw new RuntimeException("channel is error.");}}...
}public abstract class AbstractNettyRemoting implements Disposable {...protected RpcMessage buildResponseMessage(RpcMessage rpcMessage, Object msg, byte messageType) {RpcMessage rpcMsg = new RpcMessage();rpcMsg.setMessageType(messageType);rpcMsg.setCodec(rpcMessage.getCodec()); // same with requestrpcMsg.setCompressor(rpcMessage.getCompressor());rpcMsg.setBody(msg);rpcMsg.setId(rpcMessage.getId());return rpcMsg;}//rpc async request.protected void sendAsync(Channel channel, RpcMessage rpcMessage) {channelWritableCheck(channel, rpcMessage.getBody());if (LOGGER.isDebugEnabled()) {LOGGER.debug("write message:" + rpcMessage.getBody() + ", channel:" + channel + ",active?" + channel.isActive() + ",writable?" + channel.isWritable() + ",isopen?" + channel.isOpen());}doBeforeRpcHooks(ChannelUtil.getAddressFromChannel(channel), rpcMessage);channel.writeAndFlush(rpcMessage).addListener((ChannelFutureListener) future -> {if (!future.isSuccess()) {destroyChannel(future.channel());}});}...
}
6.Client获取Server的响应与处理的源码
-> ClientHandler.channelRead()接收Seata Server返回的响应;
-> AbstractNettyRemoting.processMessage()处理RpcMessage消息;
-> ClientOnResponseProcessor.process()会设置MessageFuture结果;
-> MessageFuture.setResultMessage()设置MessageFuture结果;
-> CompletableFuture.complete()唤醒阻塞的线程;
public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting implements RemotingClient {...@Sharableclass ClientHandler extends ChannelDuplexHandler {@Overridepublic void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {if (!(msg instanceof RpcMessage)) {return;}processMessage(ctx, (RpcMessage) msg);}...}...
}public abstract class AbstractNettyRemoting implements Disposable {...protected void processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {if (LOGGER.isDebugEnabled()) {LOGGER.debug(String.format("%s msgId:%s, body:%s", this, rpcMessage.getId(), rpcMessage.getBody()));}Object body = rpcMessage.getBody();if (body instanceof MessageTypeAware) {MessageTypeAware messageTypeAware = (MessageTypeAware) body;//根据消息类型获取到一个Pair对象,该Pair对象是由请求处理组件和请求处理线程池组成的//processorTable里的内容,是NettyRemotingServer在初始化时,通过调用registerProcessor()方法put进去的//所以下面的代码实际上会由ServerOnRequestProcessor的process()方法进行处理final Pair<RemotingProcessor, ExecutorService> pair = this.processorTable.get((int) messageTypeAware.getTypeCode());if (pair != null) {if (pair.getSecond() != null) {try {pair.getSecond().execute(() -> {try {pair.getFirst().process(ctx, rpcMessage);} catch (Throwable th) {LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);} finally {MDC.clear();}});} catch (RejectedExecutionException e) {...}} else {try {pair.getFirst().process(ctx, rpcMessage);} catch (Throwable th) {LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);}}} else {LOGGER.error("This message type [{}] has no processor.", messageTypeAware.getTypeCode());}} else {LOGGER.error("This rpcMessage body[{}] is not MessageTypeAware type.", body);}}...
}public class ClientOnResponseProcessor implements RemotingProcessor {...@Overridepublic void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {if (rpcMessage.getBody() instanceof MergeResultMessage) {...} else if (rpcMessage.getBody() instanceof BatchResultMessage) {...} else {//这里是对普通消息的处理MessageFuture messageFuture = futures.remove(rpcMessage.getId());if (messageFuture != null) {messageFuture.setResultMessage(rpcMessage.getBody());} else {if (rpcMessage.getBody() instanceof AbstractResultMessage) {if (transactionMessageHandler != null) {transactionMessageHandler.onResponse((AbstractResultMessage) rpcMessage.getBody(), null);}}}}}...
}public class MessageFuture {private transient CompletableFuture<Object> origin = new CompletableFuture<>();...//Sets result message.public void setResultMessage(Object obj) {origin.complete(obj);}...
}
由于Seata Client发送开启全局事务的请求给Seata Server时,会通过MessageFuture的get()方法同步等待Seata Server返回响应。所以当Seata Client获取Seata Server的响应并通过complete()方法设置MessageFuture已经完成后,原来同步等待Seata Server响应的线程便会继续往下处理。
即某线程执行CompletableFuture.complete()方法后,执行CompletableFuture.get()方法的线程就不会被阻塞而会被唤醒。
-> GlobalTransactionalInterceptor.invoke()
-> GlobalTransactionalInterceptor.handleGlobalTransaction()
-> TransactionalTemplate.execute()
-> TransactionalTemplate.beginTransaction()
-> DefaultGlobalTransaction.begin()
-> DefaultTransactionManager.begin()
-> DefaultTransactionManager.syncCall()
-> TmNettyRemotingClient.sendSyncRequest()
-> AbstractNettyRemotingClient.sendSyncRequest()发送请求;
-> AbstractNettyRemoting.sendSync()发送同步请求;
-> MessageFuture.get()会同步等待Seata Server的响应结果;
-> CompletableFuture.get()阻塞当前线程进行等待唤醒;
public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting implements RemotingClient {...@Overridepublic Object sendSyncRequest(Object msg) throws TimeoutException {//因为Seata Server是可以多节点部署实现高可用架构的,所以这里调用loadBalance()方法进行负载均衡String serverAddress = loadBalance(getTransactionServiceGroup(), msg);//获取RPC调用的超时时间long timeoutMillis = this.getRpcRequestTimeout();//构建一个RPC消息RpcMessage rpcMessage = buildRequestMessage(msg, ProtocolConstants.MSGTYPE_RESQUEST_SYNC);//send batch message//put message into basketMap, @see MergedSendRunnable//默认是不开启批量消息发送if (this.isEnableClientBatchSendRequest()) {...} else {//通过网络连接管理器clientChannelManager,获取与指定Seata Server建立的网络连接Channel//然后通过网络连接Channel把RpcMessage发送出去Channel channel = clientChannelManager.acquireChannel(serverAddress);return super.sendSync(channel, rpcMessage, timeoutMillis);}}...
}public abstract class AbstractNettyRemoting implements Disposable {...protected Object sendSync(Channel channel, RpcMessage rpcMessage, long timeoutMillis) throws TimeoutException {if (timeoutMillis <= 0) {throw new FrameworkException("timeout should more than 0ms");}if (channel == null) {LOGGER.warn("sendSync nothing, caused by null channel.");return null;}//把发送出去的请求封装到MessageFuture中,然后存放到futures这个Map里MessageFuture messageFuture = new MessageFuture();messageFuture.setRequestMessage(rpcMessage);messageFuture.setTimeout(timeoutMillis);futures.put(rpcMessage.getId(), messageFuture);channelWritableCheck(channel, rpcMessage.getBody());//获取远程地址String remoteAddr = ChannelUtil.getAddressFromChannel(channel);doBeforeRpcHooks(remoteAddr, rpcMessage);//异步化发送数据,同时对发送结果添加监听器//如果发送失败,则会对网络连接Channel进行销毁处理channel.writeAndFlush(rpcMessage).addListener((ChannelFutureListener) future -> {if (!future.isSuccess()) {MessageFuture messageFuture1 = futures.remove(rpcMessage.getId());if (messageFuture1 != null) {messageFuture1.setResultMessage(future.cause());}destroyChannel(future.channel());}});try {//然后通过请求响应组件MessageFuture同步等待Seata Server返回该请求的响应Object result = messageFuture.get(timeoutMillis, TimeUnit.MILLISECONDS);doAfterRpcHooks(remoteAddr, rpcMessage, result);return result;} catch (Exception exx) {LOGGER.error("wait response error:{},ip:{},request:{}", exx.getMessage(), channel.remoteAddress(), rpcMessage.getBody());if (exx instanceof TimeoutException) {throw (TimeoutException) exx;} else {throw new RuntimeException(exx);}}}...
}public class MessageFuture {private transient CompletableFuture<Object> origin = new CompletableFuture<>();...public Object get(long timeout, TimeUnit unit) throws TimeoutException, InterruptedException {Object result = null;try {result = origin.get(timeout, unit);if (result instanceof TimeoutException) {throw (TimeoutException)result;}} catch (ExecutionException e) {throw new ShouldNeverHappenException("Should not get results in a multi-threaded environment", e);} catch (TimeoutException e) {throw new TimeoutException(String.format("%s ,cost: %d ms", e.getMessage(), System.currentTimeMillis() - start));}if (result instanceof RuntimeException) {throw (RuntimeException)result;} else if (result instanceof Throwable) {throw new RuntimeException((Throwable)result);}return result;}...
}
7.Seata与Dubbo整合的过滤器源码
(1)调用Dubbo过滤器的入口
(2)Seata与Dubbo整合的过滤器
(1)调用Dubbo过滤器的入口
-> GlobalTransactionalInterceptor.invoke()拦截添加了@GlobalTransactional注解的方法;
-> GlobalTransactionalInterceptor.handleGlobalTransaction()进行全局事务的处理;
-> TransactionalTemplate.execute()执行全局事务
-> TransactionalTemplate.beginTransaction()开启一个全局事务
-> handleGlobalTransaction().methodInvocation.proceed()真正执行目标方法
-> ApacheDubboTransactionPropagationFilter.invoke()经过Dubbo过滤器处理
public class GlobalTransactionalInterceptor implements ConfigurationChangeListener, MethodInterceptor, SeataInterceptor {...//如果调用添加了@GlobalTransactional注解的方法,就会执行如下invoke()方法@Overridepublic Object invoke(final MethodInvocation methodInvocation) throws Throwable {//methodInvocation是一次方法调用//通过methodInvocation的getThis()方法可以获取到被调用方法的对象//通过AopUtils.getTargetClass()方法可以获取到对象对应的ClassClass<?> targetClass = methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null;//通过反射,获取到目标class中被调用的method方法Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);//如果调用的目标method不为nullif (specificMethod != null && !specificMethod.getDeclaringClass().equals(Object.class)) {//尝试寻找桥接方法bridgeMethodfinal Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);//通过反射,获取被调用的目标方法的@GlobalTransactional注解final GlobalTransactional globalTransactionalAnnotation = getAnnotation(method, targetClass, GlobalTransactional.class);//通过反射,获取被调用目标方法的@GlobalLock注解final GlobalLock globalLockAnnotation = getAnnotation(method, targetClass, GlobalLock.class);//如果禁用了全局事务,或者开启了降级检查,同时降级次数大于了降级检查允许次数,那么localDisable就为true//localDisable为true则表示全局事务被禁用了,此时就不可以开启全局事务了boolean localDisable = disable || (degradeCheck && degradeNum >= degradeCheckAllowTimes);//如果全局事务没有禁用if (!localDisable) {//全局事务注解不为空,或者是AOP切面全局事务核心配置不为空if (globalTransactionalAnnotation != null || this.aspectTransactional != null) {AspectTransactional transactional;if (globalTransactionalAnnotation != null) {//创建全局事务AOP切面的核心配置AspectTransactional,配置数据会从全局事务注解里提取出来transactional = new AspectTransactional(globalTransactionalAnnotation.timeoutMills(),globalTransactionalAnnotation.name(),globalTransactionalAnnotation.rollbackFor(),globalTransactionalAnnotation.noRollbackForClassName(),globalTransactionalAnnotation.noRollbackFor(),globalTransactionalAnnotation.noRollbackForClassName(),globalTransactionalAnnotation.propagation(),globalTransactionalAnnotation.lockRetryInterval(),globalTransactionalAnnotation.lockRetryTimes());} else {transactional = this.aspectTransactional;}//真正处理全局事务的入口return handleGlobalTransaction(methodInvocation, transactional);} else if (globalLockAnnotation != null) {return handleGlobalLock(methodInvocation, globalLockAnnotation);}}}//直接运行目标方法return methodInvocation.proceed();}//真正进行全局事务的处理Object handleGlobalTransaction(final MethodInvocation methodInvocation, final AspectTransactional aspectTransactional) throws Throwable {boolean succeed = true;try {//基于全局事务执行模版TransactionalTemplate,来执行全局事务return transactionalTemplate.execute(new TransactionalExecutor() {@Overridepublic Object execute() throws Throwable {//真正执行目标方法return methodInvocation.proceed();}...});} catch (TransactionalExecutor.ExecutionException e) {...} finally {if (degradeCheck) {EVENT_BUS.post(new DegradeCheckEvent(succeed));}}}...
}public class TransactionalTemplate {...public Object execute(TransactionalExecutor business) throws Throwable {//1.Get transactionInfoTransactionInfo txInfo = business.getTransactionInfo();if (txInfo == null) {throw new ShouldNeverHappenException("transactionInfo does not exist");}//1.1 Get current transaction, if not null, the tx role is 'GlobalTransactionRole.Participant'.//根据线程本地变量副本,获取当前线程本地变量副本里是否存在xid,如果存在则创建一个全局事务//刚开始在开启一个全局事务的时候,是没有全局事务的GlobalTransaction tx = GlobalTransactionContext.getCurrent();//1.2 Handle the transaction propagation.//从全局事务配置里,可以获取到全局事务的传播级别,默认是REQUIRED//也就是如果存在一个全局事务,就直接执行业务;如果不存在一个全局事务,就开启一个新的全局事务;Propagation propagation = txInfo.getPropagation();//不同的全局事务传播级别,会采取不同的处理方式//比如挂起当前事务 + 开启新的事务,或者是直接不使用事务执行业务,挂起其实就是解绑当前线程的xid//可以通过@GlobalTransactional注解,定制业务方法的全局事务,比如指定业务方法全局事务的传播级别SuspendedResourcesHolder suspendedResourcesHolder = null;try {switch (propagation) {...}//1.3 If null, create new transaction with role 'GlobalTransactionRole.Launcher'.if (tx == null) {tx = GlobalTransactionContext.createNew();}//set current tx config to holderGlobalLockConfig previousConfig = replaceGlobalLockConfig(txInfo);try {//2. If the tx role is 'GlobalTransactionRole.Launcher', send the request of beginTransaction to TC, else do nothing. Of course, the hooks will still be triggered.//开启一个全局事务beginTransaction(txInfo, tx);Object rs;try {//Do Your Business//执行业务方法,把全局事务xid通过Dubbo RPC传递下去,开启并提交一个一个分支事务rs = business.execute();} catch (Throwable ex) {//3. The needed business exception to rollback.//发生异常时需要完成的事务completeTransactionAfterThrowing(txInfo, tx, ex);throw ex;}//4. everything is fine, commit.//如果一切执行正常就会在这里提交全局事务commitTransaction(tx);return rs;} finally {//5. clear//执行一些全局事务完成后的回调,比如清理等工作resumeGlobalLockConfig(previousConfig);triggerAfterCompletion();cleanUp();}} finally {//If the transaction is suspended, resume it.if (suspendedResourcesHolder != null) {//如果之前挂起了一个全局事务,此时可以恢复这个全局事务tx.resume(suspendedResourcesHolder);}}}//开启事务private void beginTransaction(TransactionInfo txInfo, GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {try {//开启全局事务之前有一个回调的一个钩子名为triggerBeforeBegin()triggerBeforeBegin();//真正去开启一个全局事务tx.begin(txInfo.getTimeOut(), txInfo.getName());//开启全局事务之后还有一个回调钩子名为triggerAfterBegin()triggerAfterBegin();} catch (TransactionException txe) {throw new TransactionalExecutor.ExecutionException(tx, txe, TransactionalExecutor.Code.BeginFailure);}}...
}
(2)Seata与Dubbo整合的过滤器
如果线程本地变量副本里的xid不为null,对应于发起RPC调用的情形。如果线程本地变量副本里的xid为null,则对应于接收RPC调用的情形。
当RootContext的xid不为null时,需要设置RpcContext的xid。当RootContext的xid为null + RpcContext的xid不为null时,需要设置RootContext的xid。
@Activate(group = {DubboConstants.PROVIDER, DubboConstants.CONSUMER}, order = 100)
public class ApacheDubboTransactionPropagationFilter implements Filter {private static final Logger LOGGER = LoggerFactory.getLogger(ApacheDubboTransactionPropagationFilter.class);@Overridepublic Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {//发起Dubbo的RPC调用时,会先从线程本地变量副本里获取xidString xid = RootContext.getXID();//然后从线程本地变量副本里获取当前的分支事务类型,默认分支类型就是ATBranchType branchType = RootContext.getBranchType();//从RpcContext里获取attachments里的xid和分支类型String rpcXid = getRpcXid();String rpcBranchType = RpcContext.getContext().getAttachment(RootContext.KEY_BRANCH_TYPE);if (LOGGER.isDebugEnabled()) {LOGGER.debug("xid in RootContext[{}] xid in RpcContext[{}]", xid, rpcXid);}boolean bind = false;if (xid != null) {//如果线程本地变量副本里的xid不为null,对应于发起RPC调用的情形//则把线程本地变量副本里的xid和分支类型,设置到RpcContext上下文里//RpcContext上下文里的attachment内容会随着RPC请求发送到其他系统中RpcContext.getContext().setAttachment(RootContext.KEY_XID, xid);RpcContext.getContext().setAttachment(RootContext.KEY_BRANCH_TYPE, branchType.name());} else {//如果线程本地变量副本里的xid为null且RpcContext里的xid不为null,对应于接收RPC调用的情形if (rpcXid != null) {//把RpcContext里的xid绑定到当前服务的线程本地变量副本里RootContext.bind(rpcXid); if (StringUtils.equals(BranchType.TCC.name(), rpcBranchType)) {RootContext.bindBranchType(BranchType.TCC);}bind = true;if (LOGGER.isDebugEnabled()) {LOGGER.debug("bind xid [{}] branchType [{}] to RootContext", rpcXid, rpcBranchType);}}}try {return invoker.invoke(invocation);} finally {if (bind) {BranchType previousBranchType = RootContext.getBranchType();//对线程本地变量副本里的xid做解绑String unbindXid = RootContext.unbind(); if (BranchType.TCC == previousBranchType) {RootContext.unbindBranchType();}if (LOGGER.isDebugEnabled()) {LOGGER.debug("unbind xid [{}] branchType [{}] from RootContext", unbindXid, previousBranchType);}if (!rpcXid.equalsIgnoreCase(unbindXid)) {LOGGER.warn("xid in change during RPC from {} to {},branchType from {} to {}", rpcXid, unbindXid, rpcBranchType != null ? rpcBranchType : "AT", previousBranchType);if (unbindXid != null) {RootContext.bind(unbindXid);LOGGER.warn("bind xid [{}] back to RootContext", unbindXid);if (BranchType.TCC == previousBranchType) {RootContext.bindBranchType(BranchType.TCC);LOGGER.warn("bind branchType [{}] back to RootContext", previousBranchType);}}}}//对RpcContext上下文里的东西进行解绑RpcContext.getContext().removeAttachment(RootContext.KEY_XID);RpcContext.getContext().removeAttachment(RootContext.KEY_BRANCH_TYPE);RpcContext.getServerContext().removeAttachment(RootContext.KEY_XID);RpcContext.getServerContext().removeAttachment(RootContext.KEY_BRANCH_TYPE);}}private String getRpcXid() {String rpcXid = RpcContext.getContext().getAttachment(RootContext.KEY_XID);if (rpcXid == null) {rpcXid = RpcContext.getContext().getAttachment(RootContext.KEY_XID.toLowerCase());}return rpcXid;}
}
相关文章:
Seata源码—5.全局事务的创建与返回处理二
大纲 1.Seata开启分布式事务的流程总结 2.Seata生成全局事务ID的雪花算法源码 3.生成xid以及对全局事务会话进行持久化的源码 4.全局事务会话数据持久化的实现源码 5.Seata Server创建全局事务与返回xid的源码 6.Client获取Server的响应与处理的源码 7.Seata与Dubbo整合…...
mac-M系列芯片安装软件报错:***已损坏,无法打开。推出磁盘问题
因为你安装的软件在Intel 或arm芯片的mac上没有签名导致。 首先打开任何来源操作 在系统设置中配置,如下图: 2. 然后打开终端,输入: sudo spctl --master-disable然后输入电脑锁屏密码 打开了任何来源,还遇到已损坏…...
端到端自动驾驶系统实战指南:从Comma.ai架构到PyTorch部署
引言:端到端自动驾驶的技术革命 在自动驾驶技术演进历程中,端到端(End-to-End)架构正引领新一轮技术革命。不同于传统分模块处理感知、规划、控制的方案,端到端系统通过深度神经网络直接建立传感器原始数据到车辆控制…...
MoveIt Setup Assistant 在导入urdf文件的时候报错
在使用MoveIt Setup Assistant导入urdf文件的时候(load a urdf or collada robot model),找到urdf文件后MoveIt Setup Assistant闪退并报错: Warning: Ignoring XDG_SESSION_TYPEwayland on Gnome. Use QT_QPA_PLATFORMwayland to run on Wayland anyway…...
uniapp +vue +springboot多商家订餐系统
uniapp vue springboot多商家订餐系统,这个系统我整理调试的多商家,多用户的,多端小程序订餐系统,主要包含了uniapp小程序端,管理后台页面vue端,后台功能接口Springboot端,源码齐全,…...
docker迅雷自定义端口号、登录用户名密码
在NAS上部署迅雷,确实会带来很大的方便。但是目前很多教程都是讲怎么部署docker迅雷,鲜有将自定义配置的方法。这里讲一下怎么部署,并重点讲一下支持的自定义参数。 一、部署docker 在其他教程中,都是介绍的如下命令,…...
联想笔记本黑屏了,排线出问题还是静电
以下引用 联想电脑屏幕不亮,电源键和键盘灯均正常的解决办法(超简单)_拯救者屏幕不亮,键盘有电-CSDN博客 昨天正常关机后,今天一早来工位打开电脑,美美开始玩手机。 一会之后抬头屏幕是黑的,还以为自动息…...
uniapp -- uCharts 仪表盘刻度显示 0.9999999 这样的值问题处理。
文章目录 🍉问题🍉解决方案🍉问题 在仪表盘上,23.8变成了 23.799999999999997 🍉解决方案 formatter格式化问题 1:在 config-ucharts.js 或 config-echarts.js 配置对应的 formatter 方法 formatter: {yAxisDemo1: function (...
为 Spring Boot 应用程序构建 CI/CD 流水线
为 Spring Boot 应用程序创建构建/部署流水线涉及多个步骤,而 Jenkins 可以作为强大的工具来自动化这些流程。在本教程中,我们将指导您为托管在 GitHub 上的 Spring Boot 应用程序设置流水线,使用 Jenkins 构建该应用程序,并将其部…...
数值分析填空题速通
填空题速通 文章目录 填空题速通误差与误差传播均差插值与误差范数、赋范线性空间与内积、内积空间范数代数精度数值微分积分误差迭代方程与收敛阶微分方程数值解法的迭代公式与阶 误差与误差传播 例 设 a 1.414 a 1.414 a1.414, b − 0.576 b -0.576 b−0.57…...
day016-系统负载压力测试-磁盘管理
文章目录 1. 系统负载2. 模拟系统高负载2.1 模拟cpu负载2.2 模拟IO负载 3. 磁盘接口分类4. 思维导图 1. 系统负载 系统负载是衡量系统繁忙程度的指标负载值接近或超过cpu核心总数表示系统负载高负载高常见原因:1.占用cpu过多导致2.占用磁盘IO过多导致(I…...
DeepSeek指令微调与强化学习对齐:从SFT到RLHF
后训练微调的重要性 预训练使大模型获得丰富的语言和知识表达能力,但其输出往往与用户意图和安全性需求不完全匹配。业内普遍采用三阶段训练流程:预训练 → 监督微调(SFT)→ 人类偏好对齐(RLHF)。预训练阶段模型在大规模语料上学习语言规律;监督微调利用人工标注的数据…...
安全性(一):加密算法总结
一、加密算法分类总览 加密类型关键特性代表算法主要用途对称加密加解密使用同一个密钥DES、3DES、AES、SM4数据加密传输、存储非对称加密公钥加密,私钥解密(或反向)RSA、DSA、ECC、SM2密钥交换、数字签名、身份认证哈希算法不可逆摘要MD5、…...
DeepSeek 赋能军事:重塑现代战争形态的科技密码
目录 一、引言:AI 浪潮下的军事变革与 DeepSeek 崛起二、DeepSeek 技术原理与特性剖析2.1 核心技术架构2.2 独特优势 三、DeepSeek 在军事侦察中的应用3.1 海量数据快速处理3.2 精准目标识别追踪3.3 预测潜在威胁 四、DeepSeek 在军事指挥决策中的应用4.1 战场态势实…...
我司助力高校打造「智慧创新AI学习中心」
为推动AI教育融合跨领域应用,东吴大学于2025年4月举行「智慧创新AI学习中心」揭牌仪式,并宣布正式启动AI特色课程与教学空间建置计画。此次建置由我司协助整体教室空间与设备规划,导入最新NVIDIA GeForce RTX 50系列桌上型电脑,并…...
AI赋能把“杂多集合”转化为“理想集合”的数学建模与认知升级
AI赋能把“杂多集合”转化为“理想集合”的数学建模与认知升级 一、核心概念定义 杂多集合(Chaotic Set) 定义:元素间关系模糊、结构无序的集合 数学表达:C{x∣x∈X,P(x)},其中 P(x) 是模糊隶属函数 实例…...
NVC++ 介绍与使用指南
文章目录 NVC 介绍与使用指南NVC 简介安装 NVC基本使用编译纯 C 程序编译 CUDA C 程序 关键编译选项示例代码使用标准并行算法 (STDPAR)混合 CUDA 和 C 优势与限制优势限制 调试与优化 NVC 介绍与使用指南 NVC 是 NVIDIA 提供的基于 LLVM 的 C 编译器,专为 GPU 加速…...
Redis 事务与管道:原理、区别与应用实践
在现代分布式系统开发中,Redis 作为高性能的内存数据库,其事务处理和管道技术是开发者必须掌握的核心知识点。本文将深入探讨 Redis 事务和管道的实现原理、使用场景、性能差异以及最佳实践,帮助开发者根据实际需求选择合适的技术方案。 一、…...
Git 多人协作
目录 情景一 情景二 合并分支 情景一 目标:远程 master 分支下的 file.txt 文件新增代码 "aaa","bbb"。 实现:由开发者1新增 "aaa" ,开发者2新增 bbb。 条件:在一个分支下合作完成。 针对以上情景我们要注意…...
Unity 人物模型学习笔记
一、关于模型的检查 拿到人物模型时,检查人物: 位置信息是否在0点布线/UV是否正常身体各部分是否分开各部分命名是否清晰骨骼需要绑定 二、Unity人物动画 https://www.bilibili.com/video/BV1cc41197mF?spm_id_from333.788.recommend_more_video.-1&a…...
【和春笋一起学C++】(十四)指针与const
将const用于指针,有两种情况: const int *pt; int * const pt; 目录 1. const int *pt 2. int * const pt 3. 扩展 1. const int *pt 首先看第一种情况,const在int的前面,有如下语句: int peoples12࿱…...
AI知识梳理——RAG、Agent、ReAct、LangChain、LangGraph、MCP、Function Calling、JSON-RPC
AI技术I AI技术II RAG 📌 高度凝练表达 RAG (检索增强生成)是一种结合信息检索与生成式人工智能的技术框架,旨在提升大型语言模型(LLM)的输出准确性和实用性。通过在生成响应前引入外部知识库的信息&#…...
ModuleNotFoundError: No module named ‘SDToolbox‘
(py311) C:>python Python 3.11.11 | packaged by Anaconda, Inc. | (main, Dec 11 2024, 16:34:19) [MSC v.1929 64 bit (AMD64)] on win32 Type “help”, “copyright”, “credits” or “license” for more information. from SDToolbox import PostShock_eq Tracebac…...
在宝塔中使用.NET环境管理部署 .NET Core项目
本次采用的演示环境未腾讯云轻量级服务器,使用应用模板,选择宝塔liunx面板。 一、登录宝塔安装环境 直接选择免密登录 进入腾讯云webshell后,输入bt,选择14 选择网站选择.NET项目安装.NET环境管理安装对应的.NET 版本 注意&…...
【DAY21】 常见的降维算法
内容来自浙大疏锦行python打卡训练营 浙大疏锦行 目录 PCA主成分分析 t-sne降维 线性判别分析 (Linear Discriminant Analysis, LDA) 作业: 什么时候用到降维 降维的主要应用场景 知识点回顾: PCA主成分分析t-sne降维LDA线性判别 通常情况下,…...
Linux面试题集合(3)
一秒刷新一次某个进程的状况 top -d 1 -p pid ’显示pid为1、2、3的进程的状况 top -p 1,2,3(按上键选择某个进程) 强制杀死进程 kill -9 pid 说一下ps和top命令的区别 ps命令只能显示执行瞬间的进程状态 top命令实时跟进进程状态 你在工作中什么情况下…...
Pytorch实现常用代码笔记
Pytorch实现常用代码笔记 基础实现代码其他代码示例Network ModulesLossUtils 基础实现代码 参考 深度学习手写代码 其他代码示例 Network Modules Pytorch实现Transformer代码示例 Loss PyTorch实现CrossEntropyLoss示例 Focal Loss 原理详解及 PyTorch 代码实现 PyTorc…...
vscode vue 项目 css 颜色调色版有两个
vue 项目 css 颜色调色版有两个,不知道是哪个插件冲突了。 这个用着很别扭,一个个插件删除后发现是 Vue - Official 这个插件问题,删了就只有一个调色版了。...
MySQL刷题相关简单语法集合
去重 distinct 关键字 eg. :select distinct university from user_profile 返回行数限制: limit关键字 eg. :select device_id from user_profile limit 2 返回列重命名:as 关键字 eg.:select device_id as user_in…...
MySQL多条件查询深度解析
一、业务场景引入 在数据分析场景中,我们经常会遇到需要从多个维度筛选数据的需求。例如,某教育平台运营团队希望同时查看"山东大学"的所有学生以及所有"男性"用户的详细信息,包括设备ID、性别、年龄和GPA数据ÿ…...
RT Thread FinSH(msh)调度逻辑
文章目录 概要FinSH功能FinSH调度逻辑细节小结 概要 RT-Thread(Real-Time Thread)作为一款开源的嵌入式实时操作系统,在嵌入式设备领域得到了广泛应用。 该系统不仅具备强大的任务调度功能,还集成了 FinSH命令行系统,…...
安装nerdctl和buildkitd脚本命令
#!/bin/bash set -euo pipefail # 检查是否以root权限运行 if [ "$(id -u)" -ne 0 ]; then echo "错误:请使用root权限或sudo运行本脚本" >&2 exit 1 fi # 检测openEuler系统(兼容大小写) detect_distrib…...
HTTP与HTTPS协议的核心区别
HTTP与HTTPS协议的核心区别 数据传输安全性 HTTP采用明文传输,数据易被窃听或篡改(如登录密码、支付信息),而HTTPS通过SSL/TLS协议对传输内容加密,确保数据完整性并防止中间人攻击。例如,HTTPS会生成对称加…...
51单片机仿真突然出问题
最近发现仿真出问题了,连最简单的程序运行结果都不对,比如,左移位<<,如果写P1 << 1;则没有问题,但写成P1 << cnt;就不对(cnt已经定义过,而且赋了初值&…...
(C语言)超市管理系统 (正式版)(指针)(数据结构)(清屏操作)(文件读写)(网页版预告)(html)(js)(json)
目录 前言: 源代码: product.h product.c fileio.h fileio.c main.c json_export.h json_export.c tasks.json idex.html script.js 相关步骤: 第一步: 第二步: 第三步: 第四步: 第五步…...
uni-app小程序登录后…
前情 最近新接了一个全新项目,是类似商城的小程序项目,我负责从0开始搭建小程序,我选用的技术栈是uni-app技术栈,其中就有一个用户登录功能,小程序部分页面是需要登录才可以查看的,对于未登录的用户需要引…...
从零开始理解Jetty:轻量级Java服务器的入门指南
目录 一、Jetty是什么?先看一个生活比喻 二、5分钟快速入门:搭建你的第一个Jetty服务 步骤1:Maven依赖配置 步骤2:编写简易Servlet(厨房厨师) 步骤3:组装服务器(餐厅开业准备&am…...
如何免费在线PDF转换成Excel
咱们工作中是不是经常遇到这种头疼事儿?辛辛苦苦从别人那里拿到PDF文件,想改个数据调个格式,结果发现根本没法直接编辑! 数据被困住:PDF表格无法直接计算/筛选,手动录入太反人类! 格式大崩坏&…...
StarRocks MCP Server 开源发布:为 AI 应用提供强大分析中枢
过去,开发者要让大模型(LLM)使用数据库查询数据,往往需要开发专属插件、设计复杂的接口或手动构建 Prompt,这不仅费时费力,而且很难在不同模型之间复用。StarRocks MCP Server 提供了一个“通用适配器”接口…...
Vue百日学习计划Day21-23天详细计划-Gemini版
总目标: 在 Day 21-23 完成 Vue.js 的介绍学习、环境搭建,并成功运行第一个 Vue 3 项目,理解其基本结构。 Day 21: Vue.js 介绍与概念理解 (~3 小时) 本日目标: 理解 Vue.js 是什么、渐进式框架的概念以及选择 Vue 的原因。初步了解 Vite 是什么及其作用…...
JS逆向-某易云音乐下载器
文章目录 介绍下载链接Robots文件搜索功能JS逆向**函数a:生成随机字符串****函数b:AES-CBC加密****函数c:RSA公钥加密** 歌曲下载总结 介绍 在某易云音乐中,很多歌曲听是免费的,但下载需要VIP,此程序旨在“…...
Qt与Hid设备通信
什么是HID? HID(Human Interface Device)是直接与人交互的电子设备,通过标准化协议实现用户与计算机或其他设备的通信,典型代表包括键盘、鼠标、游戏手柄等。 为什么HID要与qt进行通信? 我这里的应…...
QT使用QXlsx读取excel表格中的图片
前言 读取excel表格中的图片的需求比较小众,QXlsx可以操作excel文档,进行图片读取、插入操作,本文主要分享单独提取图片和遍历表格提取文字和图片。 源码下载 github 开发环境准备 把下载的代码中的QXlsx目录,整个拷贝到所创建…...
二叉树进阶
一、二叉搜索树 1.二叉搜索树的概念 二叉搜索树又称二叉排序树,它也可以是一棵空树,或是具备以下性质的树: 1.1 若它的左子树不为空,则它左子树上所有节点的值都小于根节点的值。 1.2 若它的右子树不为空,则它右子…...
腾讯 CodeBuddy 杀入 AI 编程赛道,能否撼动海外工具霸主地位?
在 AI 编程助手领域,海外的 Cursor 等工具风头正劲,如今腾讯带着 CodeBuddy 隆重登场,国产 AI 编程助手能否借其之力崛起?让我们一探究竟。 官网: 腾讯云代码助手 CodeBuddy - AI 时代的智能编程伙伴 实战安装教程 …...
项目QT+ffmpeg+rtsp(二)——海康威视相机测试
文章目录 前言一、验证RTSP地址的有效性1.1 使用VLC播放器验证1.2 使用FFmpeg命令行验证1.3 使用Python代码检查网络连接1.4 检查摄像头Web界面1.5 使用RTSP客户端工具二、关于IPV4的地址2.1 原来2.1.1 原因2.2 解决2.3 显示前言 昨晚拿到一个海康威视的相机,是连接上了交换机…...
vscode用python开发maya联动调试设置
如何在VScode里编写Maya Python脚本_哔哩哔哩_bilibili1 包括1,maya的python全面在vscode支持,2,通过mayacode发送到maya,3同步调试 import maya.cmds as cmds 1、让 maya.cmds编译通过 下载Autodesk_Maya_2018_6_Update_DEVK…...
Postman遇到脚本不支持replaceIn函数
目录: 1、问题现象2、代码处理3、执行结果 1、问题现象 2、代码处理 function replaceVariables(template) {// 获取所有变量(环境变量全局变量)const variables pm.environment.toObject();const globalVars pm.globals.toObject();const…...
精益数据分析(64/126):移情阶段的用户触达策略——从社交平台到精准访谈
精益数据分析(64/126):移情阶段的用户触达策略——从社交平台到精准访谈 在创业的移情阶段,精准找到目标用户并开展深度访谈是验证需求的关键。今天,我们结合《精益数据分析》中的方法论,探讨如何利用Twit…...
turn.js与 PHP 结合使用来实现 PDF 文件的页面切换效果
将 Turn.js 与 PHP 结合使用来实现 PDF 文件的页面切换效果,你需要一个中间步骤将 PDF 转换为 Turn.js 可以处理的格式(如 HTML 页面或图片)。以下是实现这一功能的步骤和示例代码: 步骤 1: 安装必要的库 首先,你需要…...