当前位置: 首页 > news >正文

Seata源码—3.全局事务注解扫描器的初始化二

大纲

1.全局事务注解扫描器继承的父类与实现的接口

2.全局事务注解扫描器的核心变量

3.Spring容器初始化后初始化Seata客户端的源码

4.TM全局事务管理器客户端初始化的源码

5.TM组件的Netty网络通信客户端初始化源码

6.Seata框架的SPI动态扩展机制源码

7.向Seata客户端注册网络请求处理器的源码

8.Seata客户端的定时调度任务源码

9.Seata客户端初始化Netty Bootstrap的源码

10.Seata客户端的寻址机制与连接服务端的源码

11.RM分支事务资源管理器客户端初始化的源码

12.全局事务注解扫描器扫描Bean是否有Seata注解

13.Seata全局事务拦截器的创建和初始化

14.基于Spring AOP创建全局事务动态代理的源码

15.全局事务注解扫描器的初始化总结

7.向Seata客户端注册网络请求处理器的源码

(1)向Seata客户端注册网络请求处理器

(2)初始化Seata客户端的Netty网络服务器

(1)向Seata客户端注册网络请求处理器

这些网络请求处理器主要就是:对事务协调者进行响应的处理器和心跳消息处理器。

public class TMClient {public static void init(String applicationId, String transactionServiceGroup) {init(applicationId, transactionServiceGroup, null, null);}public static void init(String applicationId, String transactionServiceGroup, String accessKey, String secretKey) {TmNettyRemotingClient tmNettyRemotingClient = TmNettyRemotingClient.getInstance(applicationId, transactionServiceGroup, accessKey, secretKey);tmNettyRemotingClient.init();}
}public final class TmNettyRemotingClient extends AbstractNettyRemotingClient {...private final AtomicBoolean initialized = new AtomicBoolean(false);@Overridepublic void init() {//registry processor,注册一些请求处理器//由于Seata Server是可以主动给Seata Client发送请求过来的//所以Netty收到不同的请求时需要有不同的请求处理器来处理registerProcessor();if (initialized.compareAndSet(false, true)) {//初始化Netty网络服务器super.init();if (io.seata.common.util.StringUtils.isNotBlank(transactionServiceGroup)) {getClientChannelManager().reconnect(transactionServiceGroup);}}}private void registerProcessor() {//1.registry TC response processor,对事务协调者进行响应的处理器ClientOnResponseProcessor onResponseProcessor = new ClientOnResponseProcessor(mergeMsgMap, super.getFutures(), getTransactionMessageHandler());super.registerProcessor(MessageType.TYPE_SEATA_MERGE_RESULT, onResponseProcessor, null);super.registerProcessor(MessageType.TYPE_GLOBAL_BEGIN_RESULT, onResponseProcessor, null);super.registerProcessor(MessageType.TYPE_GLOBAL_COMMIT_RESULT, onResponseProcessor, null);super.registerProcessor(MessageType.TYPE_GLOBAL_REPORT_RESULT, onResponseProcessor, null);super.registerProcessor(MessageType.TYPE_GLOBAL_ROLLBACK_RESULT, onResponseProcessor, null);super.registerProcessor(MessageType.TYPE_GLOBAL_STATUS_RESULT, onResponseProcessor, null);super.registerProcessor(MessageType.TYPE_REG_CLT_RESULT, onResponseProcessor, null);super.registerProcessor(MessageType.TYPE_BATCH_RESULT_MSG, onResponseProcessor, null);//2.registry heartbeat message processor,心跳消息处理器ClientHeartbeatProcessor clientHeartbeatProcessor = new ClientHeartbeatProcessor();super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, clientHeartbeatProcessor, null);}...
}public class ClientOnResponseProcessor implements RemotingProcessor {//The Merge msg map from io.seata.core.rpc.netty.AbstractNettyRemotingClient#mergeMsgMapprivate Map<Integer, MergeMessage> mergeMsgMap;//The Futures from io.seata.core.rpc.netty.AbstractNettyRemoting#futuresprivate final ConcurrentMap<Integer, MessageFuture> futures;//To handle the received RPC message on upper levelprivate final TransactionMessageHandler transactionMessageHandler;public ClientOnResponseProcessor(Map<Integer, MergeMessage> mergeMsgMap, ConcurrentHashMap<Integer, MessageFuture> futures, TransactionMessageHandler transactionMessageHandler) {this.mergeMsgMap = mergeMsgMap;this.futures = futures;this.transactionMessageHandler = transactionMessageHandler;}...
}public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting implements RemotingClient {...@Overridepublic void registerProcessor(int requestCode, RemotingProcessor processor, ExecutorService executor) {Pair<RemotingProcessor, ExecutorService> pair = new Pair<>(processor, executor);this.processorTable.put(requestCode, pair);}...
}public abstract class AbstractNettyRemoting implements Disposable {...//This container holds all processors.protected final HashMap<Integer/*MessageType*/, Pair<RemotingProcessor, ExecutorService>> processorTable = new HashMap<>(32);...
}

(2)初始化Seata客户端的Netty网络服务器

public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting implements RemotingClient {private NettyClientChannelManager clientChannelManager;private ExecutorService mergeSendExecutorService;private final NettyClientBootstrap clientBootstrap;...@Overridepublic void init() {//启动一个定时任务,每隔10s对tx分组发起一个重连接timerExecutor.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {clientChannelManager.reconnect(getTransactionServiceGroup());}}, SCHEDULE_DELAY_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS);//是否启用客户端批量发送请求,默认是falseif (this.isEnableClientBatchSendRequest()) {mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD,MAX_MERGE_SEND_THREAD,KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>(),new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD));mergeSendExecutorService.submit(new MergedSendRunnable());}super.init();//启动Seata客户端的Netty网络服务器clientBootstrap.start();}...
}

8.Seata客户端的定时调度任务源码

Seata客户端在初始化时会启动两个定时任务:

一.每隔10s对Seata服务端发起一个重连接

二.每隔3秒检查发送的请求是否响应超时

public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting implements RemotingClient {private NettyClientChannelManager clientChannelManager;private ExecutorService mergeSendExecutorService;private final NettyClientBootstrap clientBootstrap;private static final long SCHEDULE_INTERVAL_MILLS = 10 * 1000L;...@Overridepublic void init() {//启动一个定时任务,每隔10s对Seata服务端发起一个重连接timerExecutor.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {clientChannelManager.reconnect(getTransactionServiceGroup());}}, SCHEDULE_DELAY_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS);//是否启用客户端批量发送请求,默认是falseif (this.isEnableClientBatchSendRequest()) {mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD,MAX_MERGE_SEND_THREAD,KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>(),new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD));mergeSendExecutorService.submit(new MergedSendRunnable());}super.init();//启动Seata客户端的Netty网络服务器clientBootstrap.start();}...
}public abstract class AbstractNettyRemoting implements Disposable {//The Timer executor. 由单个线程进行调度的线程池protected final ScheduledExecutorService timerExecutor = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("timeoutChecker", 1, true));//Obtain the return result through MessageFuture blocking.protected final ConcurrentHashMap<Integer, MessageFuture> futures = new ConcurrentHashMap<>();protected volatile long nowMills = 0;private static final int TIMEOUT_CHECK_INTERVAL = 3000;...public void init() {//启动一个定时任务,每隔3秒检查发送的请求是否响应超时timerExecutor.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {for (Map.Entry<Integer, MessageFuture> entry : futures.entrySet()) {MessageFuture future = entry.getValue();if (future.isTimeout()) {futures.remove(entry.getKey());RpcMessage rpcMessage = future.getRequestMessage();future.setResultMessage(new TimeoutException(String.format("msgId: %s ,msgType: %s ,msg: %s ,request timeout", rpcMessage.getId(), String.valueOf(rpcMessage.getMessageType()), rpcMessage.getBody().toString())));if (LOGGER.isDebugEnabled()) {LOGGER.debug("timeout clear future: {}", entry.getValue().getRequestMessage().getBody());}}}nowMills = System.currentTimeMillis();}}, TIMEOUT_CHECK_INTERVAL, TIMEOUT_CHECK_INTERVAL, TimeUnit.MILLISECONDS);}
}

9.Seata客户端初始化Netty Bootstrap的源码

基于Netty的API构建一个Bootstrap:

public final class TmNettyRemotingClient extends AbstractNettyRemotingClient {...private final AtomicBoolean initialized = new AtomicBoolean(false);@Overridepublic void init() {//registry processor,注册一些请求处理器//由于Seata Server是可以主动给Seata Client发送请求过来的//所以Netty收到不同的请求时需要有不同的请求处理器来处理registerProcessor();if (initialized.compareAndSet(false, true)) {//初始化Netty网络服务器super.init();if (io.seata.common.util.StringUtils.isNotBlank(transactionServiceGroup)) {//找到长连接管理器,对事务服务分组发起连接请求getClientChannelManager().reconnect(transactionServiceGroup);}}}...
}public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting implements RemotingClient {private NettyClientChannelManager clientChannelManager;private ExecutorService mergeSendExecutorService;private final NettyClientBootstrap clientBootstrap;private static final long SCHEDULE_INTERVAL_MILLS = 10 * 1000L;...@Overridepublic void init() {//启动一个定时任务,每隔10s对Seata服务端发起一个重连接timerExecutor.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {clientChannelManager.reconnect(getTransactionServiceGroup());}}, SCHEDULE_DELAY_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS);//是否启用客户端批量发送请求,默认是falseif (this.isEnableClientBatchSendRequest()) {mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD,MAX_MERGE_SEND_THREAD,KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>(),new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD));mergeSendExecutorService.submit(new MergedSendRunnable());}super.init();//启动Seata客户端的Netty网络服务器clientBootstrap.start();}...
}public class NettyClientBootstrap implements RemotingBootstrap {private final NettyClientConfig nettyClientConfig;private final Bootstrap bootstrap = new Bootstrap();private final EventLoopGroup eventLoopGroupWorker;private EventExecutorGroup defaultEventExecutorGroup;private ChannelHandler[] channelHandlers;...public NettyClientBootstrap(NettyClientConfig nettyClientConfig, final EventExecutorGroup eventExecutorGroup, NettyPoolKey.TransactionRole transactionRole) {this.nettyClientConfig = nettyClientConfig;int selectorThreadSizeThreadSize = this.nettyClientConfig.getClientSelectorThreadSize();this.transactionRole = transactionRole;this.eventLoopGroupWorker = new NioEventLoopGroup(selectorThreadSizeThreadSize, new NamedThreadFactory(getThreadPrefix(this.nettyClientConfig.getClientSelectorThreadPrefix()), selectorThreadSizeThreadSize));this.defaultEventExecutorGroup = eventExecutorGroup;}@Overridepublic void start() {if (this.defaultEventExecutorGroup == null) {this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(nettyClientConfig.getClientWorkerThreads(), new NamedThreadFactory(getThreadPrefix(nettyClientConfig.getClientWorkerThreadPrefix()), nettyClientConfig.getClientWorkerThreads()));}//基于Netty的API构建一个Bootstrap//设置好对应的NioEventLoopGroup线程池组,默认1个线程就够了this.bootstrap.group(this.eventLoopGroupWorker).channel(nettyClientConfig.getClientChannelClazz()).option(ChannelOption.TCP_NODELAY, true).option(ChannelOption.SO_KEEPALIVE, true).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis()).option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize()).option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize());if (nettyClientConfig.enableNative()) {if (PlatformDependent.isOsx()) {if (LOGGER.isInfoEnabled()) {LOGGER.info("client run on macOS");}} else {bootstrap.option(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED).option(EpollChannelOption.TCP_QUICKACK, true);}}//对Netty网络通信数据处理组件pipeline进行初始化bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) {ChannelPipeline pipeline = ch.pipeline();//IdleStateHandler,空闲状态检查Handler//如果有数据通过就记录一下时间//如果超过很长时间没有数据通过,即处于空闲状态,那么就会触发一个user triggered event出去给ClientHandler来进行处理pipeline.addLast(new IdleStateHandler(nettyClientConfig.getChannelMaxReadIdleSeconds(),nettyClientConfig.getChannelMaxWriteIdleSeconds(),nettyClientConfig.getChannelMaxAllIdleSeconds()))//基于Seata通信协议的编码器.addLast(new ProtocolV1Decoder())//基于Seata通信协议的解码器.addLast(new ProtocolV1Encoder());if (channelHandlers != null) {addChannelPipelineLast(ch, channelHandlers);}}});if (initialized.compareAndSet(false, true) && LOGGER.isInfoEnabled()) {LOGGER.info("NettyClientBootstrap has started");}}...
}

10.Seata客户端的寻址机制与连接服务端的源码

(1)获取服务端地址的寻址机制

(2)Seata客户端发起与服务端的连接

(1)获取服务端地址的寻址机制

Seata客户端获取Seata服务端地址的方法是Netty长连接管理器NettyClientChannelManager的getAvailServerList()方法。

在getAvailServerList()方法中,首先会通过SPI机制获取注册中心服务实例,也就是注册中心工厂RegistryFactory会根据SPI机制构建出Seata的注册中心服务RegistryService的实例,然后再通过注册中心服务实例RegistryService的lookup()方法获取地址。

比如SPI获取到的注册中心服务实例是FileRegistryServiceImpl。那么其lookup()方法就会根据事务服务分组名称到file.conf里去找,找到映射的名字如default,然后根据default找到Seata服务端的地址列表。

public final class TmNettyRemotingClient extends AbstractNettyRemotingClient {...@Overridepublic void init() {//registry processor,注册一些请求处理器//由于Seata Server是可以主动给Seata Client发送请求过来的//所以Netty收到不同的请求时需要有不同的请求处理器来处理registerProcessor();if (initialized.compareAndSet(false, true)) {//初始化Netty网络服务器super.init();if (io.seata.common.util.StringUtils.isNotBlank(transactionServiceGroup)) {//找到长连接管理器,对事务服务分组发起连接请求getClientChannelManager().reconnect(transactionServiceGroup);}}}
}//Netty client pool manager. Netty的网络连接管理器
class NettyClientChannelManager {...//Reconnect to remote server of current transaction service group.void reconnect(String transactionServiceGroup) {List<String> availList = null;try {//根据事务服务分组获取到Seata Server的地址列表//比如根据事务服务分组名称到file.conf里去找,找到映射的名字如default//然后根据default找到Seata Server的地址列表availList = getAvailServerList(transactionServiceGroup);} catch (Exception e) {LOGGER.error("Failed to get available servers: {}", e.getMessage(), e);return;}...}...private List<String> getAvailServerList(String transactionServiceGroup) throws Exception {List<InetSocketAddress> availInetSocketAddressList = RegistryFactory.getInstance().lookup(transactionServiceGroup);if (CollectionUtils.isEmpty(availInetSocketAddressList)) {return Collections.emptyList();}return availInetSocketAddressList.stream().map(NetUtil::toStringAddress).collect(Collectors.toList());}
}public class RegistryFactory {public static RegistryService getInstance() {return RegistryFactoryHolder.INSTANCE;}private static class RegistryFactoryHolder {private static final RegistryService INSTANCE = buildRegistryService();}private static RegistryService buildRegistryService() {//接下来构建Seata注册中心服务RegistryServiceRegistryType registryType;String registryTypeName = ConfigurationFactory.CURRENT_FILE_INSTANCE.getConfig(ConfigurationKeys.FILE_ROOT_REGISTRY + ConfigurationKeys.FILE_CONFIG_SPLIT_CHAR + ConfigurationKeys.FILE_ROOT_TYPE);try {registryType = RegistryType.getType(registryTypeName);} catch (Exception exx) {throw new NotSupportYetException("not support registry type: " + registryTypeName);}//通过SPI机制进行加载,比如加载到FileRegistryServiceImpl实现类return EnhancedServiceLoader.load(RegistryProvider.class, Objects.requireNonNull(registryType).name()).provide();}
}public class FileRegistryServiceImpl implements RegistryService<ConfigChangeListener> {...@Overridepublic List<InetSocketAddress> lookup(String key) throws Exception {String clusterName = getServiceGroup(key);if (clusterName == null) {return null;}String endpointStr = CONFIG.getConfig(PREFIX_SERVICE_ROOT + CONFIG_SPLIT_CHAR + clusterName + POSTFIX_GROUPLIST);if (StringUtils.isNullOrEmpty(endpointStr)) {throw new IllegalArgumentException(clusterName + POSTFIX_GROUPLIST + " is required");}String[] endpoints = endpointStr.split(ENDPOINT_SPLIT_CHAR);List<InetSocketAddress> inetSocketAddresses = new ArrayList<>();for (String endpoint : endpoints) {String[] ipAndPort = endpoint.split(IP_PORT_SPLIT_CHAR);if (ipAndPort.length != 2) {throw new IllegalArgumentException("endpoint format should like ip:port");}inetSocketAddresses.add(new InetSocketAddress(ipAndPort[0], Integer.parseInt(ipAndPort[1])));}return inetSocketAddresses;}...
}

(2)Seata客户端发起与服务端的连接

Netty长连接管理器NettyClientChannelManager的acquireChannel()方法会尝试获取连接。如果没有存活的连接,则会在获取到锁之后通过NettyClientChannelManager的doConnect()方法来发起连接。注意:使用到了Apache的Common Pool公共对象池来管理发起的连接。

//Netty client pool manager. Netty的网络连接管理器
class NettyClientChannelManager {...//Reconnect to remote server of current transaction service group.void reconnect(String transactionServiceGroup) {List<String> availList = null;try {//根据事务服务分组获取到Seata Server的地址列表//比如根据事务服务分组名称到file.conf里去找,找到映射的名字如default//然后根据default找到Seata Server的地址列表availList = getAvailServerList(transactionServiceGroup);} catch (Exception e) {LOGGER.error("Failed to get available servers: {}", e.getMessage(), e);return;}//availList一般都不会为空if (CollectionUtils.isEmpty(availList)) {RegistryService registryService = RegistryFactory.getInstance();String clusterName = registryService.getServiceGroup(transactionServiceGroup);if (StringUtils.isBlank(clusterName)) {LOGGER.error("can not get cluster name in registry config '{}{}', please make sure registry config correct", ConfigurationKeys.SERVICE_GROUP_MAPPING_PREFIX, transactionServiceGroup);return;}if (!(registryService instanceof FileRegistryServiceImpl)) {LOGGER.error("no available service found in cluster '{}', please make sure registry config correct and keep your seata server running", clusterName);}return;}Set<String> channelAddress = new HashSet<>(availList.size());try {//尝试和每个Seata Server去建立一个长连接for (String serverAddress : availList) {try {acquireChannel(serverAddress);channelAddress.add(serverAddress);} catch (Exception e) {LOGGER.error("{} can not connect to {} cause:{}", FrameworkErrorCode.NetConnect.getErrCode(), serverAddress, e.getMessage(), e);}}} finally {if (CollectionUtils.isNotEmpty(channelAddress)) {List<InetSocketAddress> aliveAddress = new ArrayList<>(channelAddress.size());for (String address : channelAddress) {String[] array = address.split(":");aliveAddress.add(new InetSocketAddress(array[0], Integer.parseInt(array[1])));}RegistryFactory.getInstance().refreshAliveLookup(transactionServiceGroup, aliveAddress);} else {RegistryFactory.getInstance().refreshAliveLookup(transactionServiceGroup, Collections.emptyList());}}}//Acquire netty client channel connected to remote server.Channel acquireChannel(String serverAddress) {Channel channelToServer = channels.get(serverAddress);if (channelToServer != null) {channelToServer = getExistAliveChannel(channelToServer, serverAddress);if (channelToServer != null) {return channelToServer;}}if (LOGGER.isInfoEnabled()) {LOGGER.info("will connect to {}", serverAddress);}Object lockObj = CollectionUtils.computeIfAbsent(channelLocks, serverAddress, key -> new Object());//获取锁之后发起连接synchronized (lockObj) {return doConnect(serverAddress);}}...private final ConcurrentMap<String, Channel> channels = new ConcurrentHashMap<>();private Function<String, NettyPoolKey> poolKeyFunction;private final GenericKeyedObjectPool<NettyPoolKey, Channel> nettyClientKeyPool;...private Channel doConnect(String serverAddress) {Channel channelToServer = channels.get(serverAddress);if (channelToServer != null && channelToServer.isActive()) {return channelToServer;}Channel channelFromPool;try {NettyPoolKey currentPoolKey = poolKeyFunction.apply(serverAddress);if (currentPoolKey.getMessage() instanceof RegisterTMRequest) {poolKeyMap.put(serverAddress, currentPoolKey);} else {NettyPoolKey previousPoolKey = poolKeyMap.putIfAbsent(serverAddress, currentPoolKey);if (previousPoolKey != null && previousPoolKey.getMessage() instanceof RegisterRMRequest) {RegisterRMRequest registerRMRequest = (RegisterRMRequest) currentPoolKey.getMessage();((RegisterRMRequest) previousPoolKey.getMessage()).setResourceIds(registerRMRequest.getResourceIds());}}//发起连接,最终会调用到NettyPoolableFactory的makeObject()方法channelFromPool = nettyClientKeyPool.borrowObject(poolKeyMap.get(serverAddress));channels.put(serverAddress, channelFromPool);} catch (Exception exx) {LOGGER.error("{} register RM failed.", FrameworkErrorCode.RegisterRM.getErrCode(), exx);throw new FrameworkException("can not register RM,err:" + exx.getMessage());}return channelFromPool;}...
}public class NettyPoolableFactory implements KeyedPoolableObjectFactory<NettyPoolKey, Channel> {private final AbstractNettyRemotingClient rpcRemotingClient;private final NettyClientBootstrap clientBootstrap;public NettyPoolableFactory(AbstractNettyRemotingClient rpcRemotingClient, NettyClientBootstrap clientBootstrap) {this.rpcRemotingClient = rpcRemotingClient;this.clientBootstrap = clientBootstrap;}@Overridepublic Channel makeObject(NettyPoolKey key) {InetSocketAddress address = NetUtil.toInetSocketAddress(key.getAddress());if (LOGGER.isInfoEnabled()) {LOGGER.info("NettyPool create channel to " + key);}Channel tmpChannel = clientBootstrap.getNewChannel(address);long start = System.currentTimeMillis();Object response;Channel channelToServer = null;if (key.getMessage() == null) {throw new FrameworkException("register msg is null, role:" + key.getTransactionRole().name());}try {response = rpcRemotingClient.sendSyncRequest(tmpChannel, key.getMessage());if (!isRegisterSuccess(response, key.getTransactionRole())) {rpcRemotingClient.onRegisterMsgFail(key.getAddress(), tmpChannel, response, key.getMessage());} else {channelToServer = tmpChannel;rpcRemotingClient.onRegisterMsgSuccess(key.getAddress(), tmpChannel, response, key.getMessage());}} catch (Exception exx) {if (tmpChannel != null) {tmpChannel.close();}throw new FrameworkException("register " + key.getTransactionRole().name() + " error, errMsg:" + exx.getMessage());}if (LOGGER.isInfoEnabled()) {LOGGER.info("register success, cost " + (System.currentTimeMillis() - start) + " ms, version:" + getVersion(response, key.getTransactionRole()) + ",role:" + key.getTransactionRole().name() + ",channel:" + channelToServer);}return channelToServer;}...
}

11.RM分支事务资源管理器客户端初始化的源码

RmNettyRemotingClient初始化时,会注入一个DefaultResourceManager实例以便可以获取根据SPI机制加载的资源管理器,以及注入一个DefaultRMHandler实例以便可以获取根据SPI机制加载的事务消息处理器。

public class RMClient {public static void init(String applicationId, String transactionServiceGroup) {RmNettyRemotingClient rmNettyRemotingClient = RmNettyRemotingClient.getInstance(applicationId, transactionServiceGroup);rmNettyRemotingClient.setResourceManager(DefaultResourceManager.get());rmNettyRemotingClient.setTransactionMessageHandler(DefaultRMHandler.get());rmNettyRemotingClient.init();}
}public class DefaultResourceManager implements ResourceManager {protected static Map<BranchType, ResourceManager> resourceManagers = new ConcurrentHashMap<>();private static class SingletonHolder {private static DefaultResourceManager INSTANCE = new DefaultResourceManager();}public static DefaultResourceManager get() {return SingletonHolder.INSTANCE;}private DefaultResourceManager() {initResourceManagers();}protected void initResourceManagers() {//通过SPI加载所有的ResourceManager资源管理器//比如:DataSourceManager、TCCResourceManager、SagaResourceManager、ResourceManagerXAList<ResourceManager> allResourceManagers = EnhancedServiceLoader.loadAll(ResourceManager.class);if (CollectionUtils.isNotEmpty(allResourceManagers)) {for (ResourceManager rm : allResourceManagers) {resourceManagers.put(rm.getBranchType(), rm);  }}}...
}public class DefaultRMHandler extends AbstractRMHandler {protected static Map<BranchType, AbstractRMHandler> allRMHandlersMap = new ConcurrentHashMap<>();private static class SingletonHolder {private static AbstractRMHandler INSTANCE = new DefaultRMHandler();}public static AbstractRMHandler get() {return DefaultRMHandler.SingletonHolder.INSTANCE;}protected DefaultRMHandler() {initRMHandlers();}protected void initRMHandlers() {//通过SPI加载所有的RMHandler事务消息处理器//比如:RMHandlerAT、RMHandlerTCC、RMHandlerSaga、RMHandlerXAList<AbstractRMHandler> allRMHandlers = EnhancedServiceLoader.loadAll(AbstractRMHandler.class);if (CollectionUtils.isNotEmpty(allRMHandlers)) {for (AbstractRMHandler rmHandler : allRMHandlers) {allRMHandlersMap.put(rmHandler.getBranchType(), rmHandler);}}}...
}public final class RmNettyRemotingClient extends AbstractNettyRemotingClient {private final AtomicBoolean initialized = new AtomicBoolean(false);private ResourceManager resourceManager;...@Overridepublic void init() {//registry processor,注册一些请求处理器registerProcessor();if (initialized.compareAndSet(false, true)) {//和TmNettyRemotingClient.init()的一样super.init();if (resourceManager != null && !resourceManager.getManagedResources().isEmpty() && StringUtils.isNotBlank(transactionServiceGroup)) {//和TmNettyRemotingClient.init()的一样getClientChannelManager().reconnect(transactionServiceGroup);}}}private void registerProcessor() {//1.registry rm client handle branch commit processorRmBranchCommitProcessor rmBranchCommitProcessor = new RmBranchCommitProcessor(getTransactionMessageHandler(), this);super.registerProcessor(MessageType.TYPE_BRANCH_COMMIT, rmBranchCommitProcessor, messageExecutor);//2.registry rm client handle branch rollback processorRmBranchRollbackProcessor rmBranchRollbackProcessor = new RmBranchRollbackProcessor(getTransactionMessageHandler(), this);super.registerProcessor(MessageType.TYPE_BRANCH_ROLLBACK, rmBranchRollbackProcessor, messageExecutor);//3.registry rm handler undo log processorRmUndoLogProcessor rmUndoLogProcessor = new RmUndoLogProcessor(getTransactionMessageHandler());super.registerProcessor(MessageType.TYPE_RM_DELETE_UNDOLOG, rmUndoLogProcessor, messageExecutor);//4.registry TC response processorClientOnResponseProcessor onResponseProcessor = new ClientOnResponseProcessor(mergeMsgMap, super.getFutures(), getTransactionMessageHandler());super.registerProcessor(MessageType.TYPE_SEATA_MERGE_RESULT, onResponseProcessor, null);super.registerProcessor(MessageType.TYPE_BRANCH_REGISTER_RESULT, onResponseProcessor, null);super.registerProcessor(MessageType.TYPE_BRANCH_STATUS_REPORT_RESULT, onResponseProcessor, null);super.registerProcessor(MessageType.TYPE_GLOBAL_LOCK_QUERY_RESULT, onResponseProcessor, null);super.registerProcessor(MessageType.TYPE_REG_RM_RESULT, onResponseProcessor, null);super.registerProcessor(MessageType.TYPE_BATCH_RESULT_MSG, onResponseProcessor, null);//5.registry heartbeat message processorClientHeartbeatProcessor clientHeartbeatProcessor = new ClientHeartbeatProcessor();super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, clientHeartbeatProcessor, null);}...
}

12.全局事务注解扫描器扫描Bean是否有Seata注解

由于GlobalTransactionScanner继承自Spring的AbstractAutoProxyCreator,所以Spring会把Spring Bean传递给GlobalTransactionScanner进行判断,也就是让GlobalTransactionScanner重写的wrapIfNecessary()方法进行判断。

重写的wrapIfNecessary()方法会判断传递过来的Bean的Class或方法上是否添加了Seata的注解,从而决定是否需要针对Bean的Class创建动态代理,从而实现对添加了Seata的注解的方法进行拦截。

对传入的Bean创建动态代理时,是通过调用其继承的父类Spring的AbstractAutoProxyCreator的wrapIfNecessary()方法进行创建的。

这些Seata的注解包括:@GlobalTransactional、@GlobalLock、@TwoPhaseBusinessAction、@LocalTCC。

//AbstractAutoProxyCreator:Spring的动态代理自动创建者
//ConfigurationChangeListener:关注配置变更事件的监听器
//InitializingBean:Spring Bean初始化回调
//ApplicationContextAware:用来获取Spring容器
//DisposableBean:支持可抛弃Bean
public class GlobalTransactionScanner extends AbstractAutoProxyCreatorimplements ConfigurationChangeListener, InitializingBean, ApplicationContextAware, DisposableBean {...//Spring AOP里对方法进行拦截的拦截器private MethodInterceptor interceptor;//对添加了@GlobalTransactional注解的方法进行拦截的AOP拦截器private MethodInterceptor globalTransactionalInterceptor;...//The following will be scanned, and added corresponding interceptor://添加了如下注解的方法会被扫描到,然后方法会添加相应的拦截器进行拦截//TM://@see io.seata.spring.annotation.GlobalTransactional // TM annotation//Corresponding interceptor://@see io.seata.spring.annotation.GlobalTransactionalInterceptor#handleGlobalTransaction(MethodInvocation, AspectTransactional) // TM handler//GlobalLock://@see io.seata.spring.annotation.GlobalLock // GlobalLock annotation//Corresponding interceptor://@see io.seata.spring.annotation.GlobalTransactionalInterceptor# handleGlobalLock(MethodInvocation, GlobalLock)  // GlobalLock handler//TCC mode://@see io.seata.rm.tcc.api.LocalTCC // TCC annotation on interface//@see io.seata.rm.tcc.api.TwoPhaseBusinessAction // TCC annotation on try method//@see io.seata.rm.tcc.remoting.RemotingParser // Remote TCC service parser//Corresponding interceptor://@see io.seata.spring.tcc.TccActionInterceptor // the interceptor of TCC mode@Override//由于GlobalTransactionScanner继承了Spring提供的AbstractAutoProxyCreator,//所以Spring会把Spring Bean传递给GlobalTransactionScanner.wrapIfNecessary()进行判断;//让GlobalTransactionScanner来决定是否要根据Bean的Class、或者Bean的方法是否有上述注解,//从而决定是否需要针对Bean的Class创建动态代理,来对添加了注解的方法进行拦截;protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {//do checkersif (!doCheckers(bean, beanName)) {return bean;}try {synchronized (PROXYED_SET) {if (PROXYED_SET.contains(beanName)) {return bean;}interceptor = null;//check TCC proxy//判断传递进来的Bean是否是TCC动态代理//服务启动时会把所有的Bean传递进来这个wrapIfNecessary(),检查这个Bean是否需要创建TCC动态代理if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {//init tcc fence clean task if enable useTccFenceTCCBeanParserUtils.initTccFenceCleanTask(TCCBeanParserUtils.getRemotingDesc(beanName), applicationContext);//TCC interceptor, proxy bean of sofa:reference/dubbo:reference, and LocalTCCinterceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, (ConfigurationChangeListener)interceptor);} else {//获取目标class的接口Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);//existsAnnotation()方法会判断Bean的Class或Method是否添加了@GlobalTransactional等注解if (!existsAnnotation(new Class[]{serviceInterface}) && !existsAnnotation(interfacesIfJdk)) {return bean;}if (globalTransactionalInterceptor == null) {//创建一个GlobalTransactionalInterceptor,即全局事务注解的拦截器globalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook);ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, (ConfigurationChangeListener)globalTransactionalInterceptor);}interceptor = globalTransactionalInterceptor;}LOGGER.info("Bean[{}] with name [{}] would use interceptor [{}]", bean.getClass().getName(), beanName, interceptor.getClass().getName());if (!AopUtils.isAopProxy(bean)) {//如果这个Bean并不是AOP代理//接下来会基于Spring的AbstractAutoProxyCreator创建针对目标Bean接口的动态代理//这样后续调用到目标Bean的方法,就会调用到GlobalTransactionInterceptor拦截器bean = super.wrapIfNecessary(bean, beanName, cacheKey);} else {AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));int pos;for (Advisor avr : advisor) {//Find the position based on the advisor's order, and add to advisors by pospos = findAddSeataAdvisorPosition(advised, avr);advised.addAdvisor(pos, avr);}}PROXYED_SET.add(beanName);return bean;}} catch (Exception exx) {throw new RuntimeException(exx);}}...private boolean existsAnnotation(Class<?>[] classes) {if (CollectionUtils.isNotEmpty(classes)) {for (Class<?> clazz : classes) {if (clazz == null) {continue;}//目标class是否被打了@GlobalTransactional注解GlobalTransactional trxAnno = clazz.getAnnotation(GlobalTransactional.class);if (trxAnno != null) {return true;}//检查目标Spring Bean的各个方法,通过反射拿到添加了注解的一个方法Method[] methods = clazz.getMethods();for (Method method : methods) {//如果方法上被加了如@GlobalTransactional注解,则返回truetrxAnno = method.getAnnotation(GlobalTransactional.class);if (trxAnno != null) {return true;}GlobalLock lockAnno = method.getAnnotation(GlobalLock.class);if (lockAnno != null) {return true;}}}}return false;}...
}

13.Seata全局事务拦截器的创建和初始化

如果传入GlobalTransactionScanner全局事务注解扫描器的wrapIfNecessary()方法的Bean,添加了比如@GlobalTransactional的全局事务注解,那么wrapIfNecessary()方法就会创建一个全局事务注解拦截器GlobalTransactionalInterceptor。

这个全局事务注解拦截器会被存放在GlobalTransactionScanner实例里的两个变量中:interceptor和globalTransactionalInterceptor。

public class GlobalTransactionalInterceptor implements ConfigurationChangeListener, MethodInterceptor, SeataInterceptor {...public GlobalTransactionalInterceptor(FailureHandler failureHandler) {this.failureHandler = failureHandler == null ? DEFAULT_FAIL_HANDLER : failureHandler;this.disable = ConfigurationFactory.getInstance().getBoolean(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, DEFAULT_DISABLE_GLOBAL_TRANSACTION);this.order = ConfigurationFactory.getInstance().getInt(ConfigurationKeys.TM_INTERCEPTOR_ORDER, TM_INTERCEPTOR_ORDER);degradeCheck = ConfigurationFactory.getInstance().getBoolean(ConfigurationKeys.CLIENT_DEGRADE_CHECK, DEFAULT_TM_DEGRADE_CHECK);if (degradeCheck) {ConfigurationCache.addConfigListener(ConfigurationKeys.CLIENT_DEGRADE_CHECK, this);degradeCheckPeriod = ConfigurationFactory.getInstance().getInt(ConfigurationKeys.CLIENT_DEGRADE_CHECK_PERIOD, DEFAULT_TM_DEGRADE_CHECK_PERIOD);degradeCheckAllowTimes = ConfigurationFactory.getInstance().getInt(ConfigurationKeys.CLIENT_DEGRADE_CHECK_ALLOW_TIMES, DEFAULT_TM_DEGRADE_CHECK_ALLOW_TIMES);EVENT_BUS.register(this);if (degradeCheckPeriod > 0 && degradeCheckAllowTimes > 0) {startDegradeCheck();}}this.initDefaultGlobalTransactionTimeout();}...
}

14.基于Spring AOP创建全局事务动态代理的源码

全局事务注解扫描器GlobalTransactionScanner的wrapIfNecessary()方法,发现传入的Bean含有Seata的注解,需要为该Bean创建动态代理时,会调用父类Spring的AbstractAutoProxyCreator的wrapIfNecessary()方法来创建。

AbstractAutoProxyCreator的wrapIfNecessary()方法,会通过子类GlobalTransactionScanner的getAdvicesAndAdvisorsForBean()方法,获取在GlobalTransactionScanner的wrapIfNecessary()方法中构建的拦截器(也就是全局事务注解的拦截器GlobalTransactionalInterceptor),然后创建传入的Bean的动态代理。

这样后续调用到传入Bean的方法时,就会先调用GlobalTransactionInterceptor拦截器。

//关注配置变更事件的监听器、Spring Bean初始化回调、感知到Spring容器、支持可抛弃Bean
public class GlobalTransactionScanner extends AbstractAutoProxyCreatorimplements ConfigurationChangeListener, InitializingBean, ApplicationContextAware, DisposableBean {...    //Spring AOP里对方法进行拦截的拦截器private MethodInterceptor interceptor;@Override//由于GlobalTransactionScanner继承了Spring提供的AbstractAutoProxyCreator,//所以Spring会把Spring Bean传递给GlobalTransactionScanner.wrapIfNecessary()进行判断;//让GlobalTransactionScanner来决定是否要根据Bean的Class、或者Bean的方法是否有上述注解,//从而决定是否需要针对Bean的Class创建动态代理,来对添加了注解的方法进行拦截;protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {if (!doCheckers(bean, beanName)) {return bean;}try {synchronized (PROXYED_SET) {if (PROXYED_SET.contains(beanName)) {return bean;}interceptor = null;//check TCC proxy//判断传递进来的Bean是否是TCC动态代理//服务启动时会把所有的Bean传递进来这个wrapIfNecessary(),检查这个Bean是否需要创建TCC动态代理if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {//init tcc fence clean task if enable useTccFenceTCCBeanParserUtils.initTccFenceCleanTask(TCCBeanParserUtils.getRemotingDesc(beanName), applicationContext);//TCC interceptor, proxy bean of sofa:reference/dubbo:reference, and LocalTCCinterceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, (ConfigurationChangeListener)interceptor);} else {//获取目标class的接口Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);//existsAnnotation()方法会判断Bean的Class或Method是否添加了@GlobalTransactional等注解if (!existsAnnotation(new Class[]{serviceInterface}) && !existsAnnotation(interfacesIfJdk)) {return bean;}if (globalTransactionalInterceptor == null) {//构建一个GlobalTransactionalInterceptor,即全局事务注解的拦截器globalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook);ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, (ConfigurationChangeListener)globalTransactionalInterceptor);}interceptor = globalTransactionalInterceptor;}LOGGER.info("Bean[{}] with name [{}] would use interceptor [{}]", bean.getClass().getName(), beanName, interceptor.getClass().getName());if (!AopUtils.isAopProxy(bean)) {//如果这个Bean并不是AOP代理//接下来会基于Spring的AbstractAutoProxyCreator创建针对目标Bean接口的动态代理//这样后续调用到目标Bean的方法,就会调用到GlobalTransactionInterceptor拦截器bean = super.wrapIfNecessary(bean, beanName, cacheKey);} else {AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));int pos;for (Advisor avr : advisor) {// Find the position based on the advisor's order, and add to advisors by pospos = findAddSeataAdvisorPosition(advised, avr);advised.addAdvisor(pos, avr);}}PROXYED_SET.add(beanName);return bean;}} catch (Exception exx) {throw new RuntimeException(exx);}}//获取指定的拦截器@Overrideprotected Object[] getAdvicesAndAdvisorsForBean(Class beanClass, String beanName, TargetSource customTargetSource) throws BeansException {return new Object[]{interceptor};}...
}public abstract class AbstractAutoProxyCreator extends ProxyProcessorSupportimplements SmartInstantiationAwareBeanPostProcessor, BeanFactoryAware {...protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {if (StringUtils.hasLength(beanName) && this.targetSourcedBeans.contains(beanName)) {return bean;}if (Boolean.FALSE.equals(this.advisedBeans.get(cacheKey))) {return bean;}if (isInfrastructureClass(bean.getClass()) || shouldSkip(bean.getClass(), beanName)) {this.advisedBeans.put(cacheKey, Boolean.FALSE);return bean;}// Create proxy if we have advice. 获取指定的拦截器Object[] specificInterceptors = getAdvicesAndAdvisorsForBean(bean.getClass(), beanName, null);if (specificInterceptors != DO_NOT_PROXY) {this.advisedBeans.put(cacheKey, Boolean.TRUE);//创建动态代理Object proxy = createProxy(bean.getClass(), beanName, specificInterceptors, new SingletonTargetSource(bean));this.proxyTypes.put(cacheKey, proxy.getClass());return proxy;}this.advisedBeans.put(cacheKey, Boolean.FALSE);return bean;}//获取指定的拦截器protected abstract Object[] getAdvicesAndAdvisorsForBean(Class<?> beanClass, String beanName, @Nullable TargetSource customTargetSource) throws BeansException;...
}

15.全局事务注解扫描器的初始化总结

全局事务注解扫描器GlobalTransactionScanner的初始化主要做了如下三项工作:

一.初始化TM全局事务管理器客户端

二.初始化RM分支事务资源管理器客户端

三.对添加了Seata相关注解的Bean创建全局事务动态代理

相关文章:

Seata源码—3.全局事务注解扫描器的初始化二

大纲 1.全局事务注解扫描器继承的父类与实现的接口 2.全局事务注解扫描器的核心变量 3.Spring容器初始化后初始化Seata客户端的源码 4.TM全局事务管理器客户端初始化的源码 5.TM组件的Netty网络通信客户端初始化源码 6.Seata框架的SPI动态扩展机制源码 7.向Seata客户端注…...

Android Coli 3 ImageView load two suit Bitmap thumb and formal,Kotlin(七)

Android Coli 3 ImageView load two suit Bitmap thumb and formal&#xff0c;Kotlin&#xff08;七&#xff09; 在 Android Coli 3 ImageView load two suit Bitmap thumb and formal&#xff0c;Kotlin&#xff08;六&#xff09;-CSDN博客 的基础上改进&#xff0c;主要是…...

快速搭建一个electron-vite项目

1. 初始化项目 在命令行中运行以下命令 npm create quick-start/electronlatest也可以通过附加命令行选项直接指定项目名称和你想要使用的模版。例如&#xff0c;要构建一个 Electron Vue 项目&#xff0c;运行: # npm 7&#xff0c;需要添加额外的 --&#xff1a; npm cre…...

Python网络请求利器:urllib库深度解析

一、urllib库概述 urllib是Python内置的HTTP请求库&#xff0c;无需额外安装即可使用。它由四个核心模块构成&#xff1a; ​​urllib.request​​&#xff1a;发起HTTP请求的核心模块​​urllib.error​​&#xff1a;处理请求异常&#xff08;如404、超时等&#xff09;​​…...

2025认证杯第二阶段数学建模B题:谣言在社交网络上的传播思路+模型+代码

2025认证杯数学建模第二阶段思路模型代码&#xff0c;详细内容见文末名片 一、引言 在当今数字化时代&#xff0c;社交网络已然成为人们生活中不可或缺的一部分。信息在社交网络上的传播速度犹如闪电&#xff0c;瞬间就能触及大量用户。然而&#xff0c;这也为谣言的滋生和扩…...

IP地址、端口、TCP介绍、socket介绍、程序中socket管理

1、IP地址&#xff1a;IP 地址就是 标识网络中设备的一个地址&#xff0c;好比现实生活中的家庭地址。IP 地址的作用是 标识网络中唯一的一台设备的&#xff0c;也就是说通过IP地址能够找到网络中某台设备。 2、端口&#xff1a;代表不同的进程,如下图&#xff1a; 3、socket:…...

leetcode0621. 任务调度器-medium

1 题目&#xff1a;任务调度器 官方标定难度&#xff1a;中 给你一个用字符数组 tasks 表示的 CPU 需要执行的任务列表&#xff0c;用字母 A 到 Z 表示&#xff0c;以及一个冷却时间 n。每个周期或时间间隔允许完成一项任务。任务可以按任何顺序完成&#xff0c;但有一个限制…...

中小型培训机构都用什么教务管理系统?

在教育培训行业快速发展的今天&#xff0c;中小型培训机构面临着学员管理复杂、课程体系多样化、教学效果难以量化等挑战。一个高效的教务管理系统已成为机构运营的核心支撑。本文将深入分析当前市场上适用于中小型培训机构的教务管理系统&#xff0c;重点介绍爱耕云这一专业解…...

centos7 基于yolov10的推理程序环境搭建

这篇文章的前提是系统显卡驱动已经安装 安装步骤参照前一篇文章centos7安装NVIDIA显卡 安装Anaconda 下载地址anaconda.com 需要注册账号获取下载地址 wget https://repo.anaconda.com/archive/Anaconda3-2024.10-1-Linux-x86_64.sh赋予权限 chmod ax Anaconda3-2024.10-1-…...

Web GIS可视化地图框架Leaflet、OpenLayers、Mapbox、Cesium、ArcGis for JavaScript

Mapbox、OpenLayers、Leaflet、ArcGIS for JavaScript和Cesium是五种常用的Web GIS地图框架&#xff0c;它们各有优缺点&#xff0c;适用于不同的场景。还有常见的3d库和高德地图、百度地图。 1. Mapbox 官网Mapbox Gl JS案列&#xff1a;https://docs.mapbox.com/mapbox-gl-…...

Kafka如何实现高性能

Kafka如何实现高性能 Kafka之所以能成为高性能消息系统的标杆&#xff0c;是通过多层次的架构设计和优化实现的。 一、存储层优化 1. 顺序I/O设计 日志结构存储&#xff1a;所有消息追加写入&#xff0c;避免磁盘随机写分段日志&#xff1a;将日志分为多个Segment文件&…...

如何通过partclone克隆Ubuntu 22系统

如何通过partclone克隆Ubuntu 22系统 一. 背景知识&#xff1a;为什么要克隆系统&#xff1f;二. 准备工作详解2.1 选择工具&#xff1a;为什么是partclone&#xff1f;2.2 制作定制化ISO的深层原因 三. 详细操作步骤3.1 环境准备阶段3.2 ISO改造关键步骤3.3 启动到Live环境3.4…...

语义化路径是什么意思,举例说明

下面的java代码输出结果是/a/b/../c/./a.txt/a/c/a.txt&#xff0c;语义化路径是什么意思呢&#xff1f;代码如下所示&#xff1a; import org.springframework.util.StringUtils; public class StringUtilsTest { /** 字符串处理 */ Test public void …...

Dockerfile构建镜像

Dockerfile 构建镜像 # 使用本地已下载的 java:8-alpine 镜像作为基础镜像 FROM java:8-alpine# 设置工作目录 WORKDIR /home/www/shop# 复制 JAR 文件到容器中 COPY ./fkshop-build.jar /home/www/shop/fkshop-build.jar# 复制配置文件&#xff08;如果需要&#xff09; COPY…...

vue3.0的name属性插件——vite-plugin-vue-setup-extend

安装 这个由于是在开发环境下的一个插件 帮助我们支持name属性 所以需要是-D npm i vite-plugin-vue-setup-extend -D在pasckjson中无法注释每个插件的用处 可以在vscode中下载一个JsonComments这样可以在json中添加注释方便日后维护和查阅API 引入 在vite.config.js中 im…...

gRPC为什么高性能

gRPC 之所以具备高性能的特性,主要得益于其底层设计中的多项关键技术优化。以下从协议、序列化、传输机制、并发模型等方面详细解析其高性能的原因: 1. 基于 HTTP/2 协议的核心优势 HTTP/2 是 gRPC 的传输基础,相较于 HTTP/1.x,它通过以下机制显著提升了效率: 多路复用(…...

进度管理高分论文

2022年&#xff0c;xx县开展紧密型县域医共体建设&#xff0c;将全县县、镇两级医疗机构组建成2家医共体&#xff0c;要求医共体内部实行行政、人员、财务、业务、信息、绩效、药械“七统一”管理。但是卫生系统整体信息化水平较低&#xff0c;业务系统互不相通&#xff0c;运营…...

每日算法刷题计划Day7 5.15:leetcode滑动窗口4道题,用时1h

一.定长滑动窗口 【套路】教你解决定长滑窗&#xff01;适用于所有定长滑窗题目&#xff01; 模版套路 1.题目描述 1.计算所有长度恰好为 k 的子串中&#xff0c;最多可以包含多少个元音字母 2.找出平均数最大且 长度为 k 的连续子数组&#xff0c;并输出该最大平均数。 3.…...

C++核心编程--1 内存分区模型

C程序执行时&#xff0c;内存可以划分为4部分 代码区&#xff1a;存放函数体的二进制代码 全局区&#xff1a;存放全局变量、静态变量、常量 栈区&#xff1a;局部变量、函数参数值&#xff0c;编译器自动分配和释放 堆区&#xff1a;程序员自己分配和释放 1.1 程序运行前…...

产品更新丨谷云科技 iPaaS 集成平台 V7.5 版本发布

五月&#xff0c;谷云科技 iPaaS 集成平台保持月度更新&#xff0c; V7.5 版本于近日正式发布。我们一起来看看新版本有哪些升级和优化。 核心新增功能&#xff1a;深化API治理&#xff0c;释放连接价值 API网关&#xff1a;全链路可控&#xff0c;精准管控业务状态 业务状态…...

【AI论文】对抗性后期训练快速文本到音频生成

摘要&#xff1a;文本到音频系统虽然性能不断提高&#xff0c;但在推理时速度很慢&#xff0c;因此对于许多创意应用来说&#xff0c;它们的延迟是不切实际的。 我们提出了对抗相对对比&#xff08;ARC&#xff09;后训练&#xff0c;这是第一个不基于蒸馏的扩散/流模型的对抗加…...

欧拉计划 Project Euler 73(分数有范围计数)题解

欧拉计划 Project Euler 73 题解 题干分数有范围计数 思路code 题干 分数有范围计数 考虑形如 n d \frac{n}{d} dn​的分数&#xff0c;其中 n n n和 d d d均为正整数。如果 n < d n<d n<d且其最大公约数为1&#xff0c;则称该分数为最简真分数。 将所有 d ≤ 8 d\l…...

Quic如何实现udp可靠传输

QUIC&#xff08;Quick UDP Internet Connections&#xff09;是由 Google 设计并被 IETF 标准化的传输层协议&#xff0c;它基于 UDP 实现&#xff0c;但提供了类似 TCP 的可靠性和更高级的功能&#xff08;如多路复用、0-RTT 握手、TLS 加密等&#xff09;。 尽管 UDP 是不可…...

本地文件操作 MCP (多通道处理) 使用案例

## 概述 文件操作 MCP (Multi-Channel Processing) 是一种用于高效处理本地文件的框架和库&#xff0c;它提供了并行处理、批量操作、监控和异常处理等功能。通过多通道架构&#xff0c;MCP 能够显著提高大规模文件操作的效率&#xff0c;特别适用于需要处理大量文件或大型文件…...

Blender 入门教程(三):骨骼绑定

一、前言 不知道大家有没有玩过一些单机游戏的 Mod&#xff0c;比如《侠盗猎车》里主角变成奥特曼&#xff0c;各种新能源汽车乱入等等。 这些都是别人对原有模型就行修改换皮&#xff0c;并重新绑定骨骼完成的&#xff0c;所以如果会了骨骼绑定后&#xff0c;你也就可以自己…...

Java 异常处理之 BufferOverflowException(BufferOverflowException 概述、常见发生场景、避免策略)

一、BufferOverflowException 概述 BufferOverflowException 是 Java NIO 包中的一个运行时异常&#xff0c;是 RuntimeException 的子类 public class BufferOverflowException extends RuntimeException {... }# 继承关系java.lang.Object-> java.lang.Throwable-> j…...

密码学实验:凯撒密码

密码学实验&#xff1a;凯撒密码 一、实验目的 掌握凯撒密码的数学原理&#xff1a;理解字符移位与模运算的结合&#xff0c;实现加解密算法。理解暴力破解本质&#xff1a;通过穷举有限密钥空间&#xff0c;掌握利用语言特征破解密文的方法。编程实践&#xff1a;用Python实…...

C40-指针

一 指针的引入 什么是指针:指针是一个变量&#xff0c;其值是另一个变量的内存地址 简单的使用地址输出一个变量: 代码示例 #include <stdio.h> int main() {int a10;printf("a的地址是:%p\n",&a);printf("a%d\n",*(&a)); //*号是取值运算符…...

Cloudflare防火墙拦截谷歌爬虫|导致收录失败怎么解决?

许多站长发现网站突然从谷歌搜索结果中“消失”&#xff0c;背后很可能是Cloudflare防火墙误拦截了谷歌爬虫&#xff08;Googlebot&#xff09;&#xff0c;导致搜索引擎无法正常抓取页面。 由于Cloudflare默认的防护规则较为严格&#xff0c;尤其是针对高频访问的爬虫IP&…...

3.3 掌握RDD分区

本实战任务旨在掌握Spark RDD 的分区操作&#xff0c;包括理解 RDD 分区的概念、作用、分区数量的确定原则以及如何通过自定义分区器来优化数据处理。通过创建一个 Maven 项目并编写 Scala 代码&#xff0c;实现了一个自定义的科目分区器 SubjectPartitioner&#xff0c;该分区…...

以项目的方式学QT开发(二)——超详细讲解(120000多字详细讲解,涵盖qt大量知识)逐步更新!

API 描述 函数原型 参数说明 push_back() 在 list 尾部 添加一个元素 void push_back(const T& value); value &#xff1a;要添 加到尾部的元 素 这个示例演示了如何创建 std::list 容器&#xff0c;并对其进行插入、删除和迭代操作。在实际应用中&am…...

linux备份与同步工具rsync

版权声明&#xff1a;原创作品&#xff0c;请勿转载&#xff01; 文章目录 版权声明&#xff1a;原创作品&#xff0c;请勿转载&#xff01; 实验环境介绍&#xff1a; 1.工具介绍 2.详细介绍 2.1 本地模式&#xff08;用得少&#xff09; 2.2 远程模式 2.3 守护进程模式…...

Ascend的aclgraph(九)AclConcreteGraph:e2e执行aclgraph

1回顾 前面的几章内容探讨了aclgraph运行过程中的涉及到的关键模块和技术。本章节将前面涉及到的模块串联起来&#xff0c;对aclgraph形成一个端到端的了解。 先给出端到端运行的代码&#xff0c;如下&#xff1a; import torch import torch_npu import torchair import log…...

2025 OceanBase 开发者大会全议程指南

5 月 17 日&#xff0c;第三届 OceanBase 开发者大会将在广州举办。 我们邀请数据库领军者与AI实践先锋&#xff0c;与开发者一起探讨数据库与 AI 协同创新的技术趋势&#xff0c;面对面交流 OceanBase 在 TP、AP、KV 及 AI 能力上的最新进展&#xff0c;深度体验“打破技术栈…...

【深度学习之四】知识蒸馏综述提炼

知识蒸馏综述提炼 目录 知识蒸馏综述提炼 前言 参考文献 一、什么是知识蒸馏&#xff1f; 二、为什么要知识蒸馏&#xff1f; 三、一点点理论 四、知识蒸馏代码 总结 前言 知识蒸馏作为一种新兴的、通用的模型压缩和迁移学习架构&#xff0c;在最近几年展现出蓬勃的活力…...

Java大师成长计划之第23天:Spring生态与微服务架构之服务发现与注册中心

&#x1f4e2; 友情提示&#xff1a; 本文由银河易创AI&#xff08;https://ai.eaigx.com&#xff09;平台gpt-4-turbo模型辅助创作完成&#xff0c;旨在提供灵感参考与技术分享&#xff0c;文中关键数据、代码与结论建议通过官方渠道验证。 在微服务架构中&#xff0c;服务发现…...

list简单模拟实现

成员变量迭代器&#xff08;重点&#xff09;ListIterator运算符重载begin、end 插入、删除inserterase头插、尾插、头删、尾删 operator->const_iterator拷贝构造operator析构函数完整代码 由于前面已经模拟实现了vector&#xff0c;所以这里关于一些函数实现就不会讲的过于…...

undefined reference to `typeinfo for DeviceAllocator‘

出现“undefined reference to typeinfo”链接错误的原因及解决方法如下&#xff1a; class DeviceAllocator { public:explicit DeviceAllocator(DeviceType device_type){};virtual void* allocate(size_t n) 0;virtual void deallocate(void* p) 0;~DeviceAllocator() d…...

动态规划问题 -- 多状态模型(买股票的最佳时机II)

目录 动态规划分析问题五步曲题目概述利用状态机推导状态转移方程式代码编写 动态规划分析问题五步曲 不清楚动态规划分析问题是哪关键的五步的少年们可以移步到 链接: 动态规划算法基础 这篇文章非常详细的介绍了动态规划算法是如何分析和解决问题的 题目概述 链接: 买股票的最…...

【落羽的落羽 C++】进一步认识模板

文章目录 一、非类型模板参数二、模板的特化1. 函数模板特化2. 类模板特化 三、模板的编译分离 一、非类型模板参数 模板参数可以分为类型参数和非类型参数。我们之前使用的都是类型参数&#xff0c;即出现在模板参数列表中&#xff0c;跟在class或typename之类的参数类型名称…...

Java爬虫能处理京东商品数据吗?

Java爬虫完全可以处理京东商品数据。通过Java爬虫技术&#xff0c;可以高效地获取京东商品的详细信息&#xff0c;包括商品名称、价格、图片、描述等。这些信息对于市场分析、选品上架、库存管理和价格策略制定等方面具有重要价值。以下是一个完整的Java爬虫示例&#xff0c;展…...

#跟着若城学鸿蒙# web篇-初探

前言 先看下官方介绍&#xff0c;这里总结了比较重要的几点Web组件基础&#xff1a;加载与渲染网页全面解析Web组件是现代应用开发中不可或缺的重要元素&#xff0c;它允许开发者在原生应用中无缝集成Web内容。本文将全面介绍Web组件的基本功能&#xff0c;包括多种内容加载方…...

Top-p采样:解锁语言模型的创意之门

Top - p采样 是什么&#xff1a;核采样&#xff1a;排序&#xff0c;累计到0.7&#xff0c;随机选择 在自然语言生成和大规模语言模型推理中&#xff0c;Top - p采样&#xff08;又叫核采样&#xff0c;Nucleus Sampling&#xff09;是一种基于累积概率的采样策略。 Top - p介…...

周赛好题推荐

这周周赛很有质量的&#xff0c;上了一个很有意思的数学题目&#xff0c;推了半天..... 给定一个区间[l,r]&#xff0c;求出区间内所有满足x mod 2^i !k的所有正整数&#xff08;最后全部进行异或&#xff09; 首先我们不妨先算出[l,r]区间所有数字的异或&#xff0c;然后在算…...

【RabbitMQ】实现RPC通信的完整指南

文章目录 RPC 通信创建相关队列客户端代码声明队列发送请求接收响应完整代码 服务端代码设置同时只能获取一个消息接收消息完整代码 运行程序启动客户端启动服务端 RPC 通信 RPC (Remote Procedure Call), 即远过程调用。它是一种通过网络从远程计算机上请求服务&#xff0c;而…...

CK3588下安装linuxdeployqt qt6 arm64

参考资料&#xff1a; Linux —— linuxdeployqt源码编译与打包&#xff08;含出错解决&#xff09; linux cp指令报错&#xff1a;cp: -r not specified&#xff1b; cp: omitting directory ‘xxx‘&#xff08;需要加-r递归拷贝&#xff09; CMake Error at /usr/lib/x86_64…...

滑动窗口之二(优先队列)

原本滑动窗口的板子用的是数组和双指针模拟&#xff0c;我嫌麻烦还不好懂找了双端队列。但其实还是不太好使&#xff0c;比如今天的这道题就处理起来很麻烦。但是如果用优先队列的话就可以一直保证整个窗口是有序的&#xff0c;只需判断一下是否在窗口内即可。但是&#xff01;…...

小刚说C语言刷题—1088求两个数M和N的最大公约数

1.题目描述 求两个正整数 M 和 N 的最大公约数(M&#xff0c;N都在长整型范围内&#xff09; .输入 输入一行&#xff0c;包括两个正整数。 输出 输出只有一行&#xff0c;包括1个正整数。 样例 输入 45 60 输出 15 2.参考代码(C语言版) #include <stdio.h> …...

pytorch训练可视化工具---TensorBoard

一、目的&#xff1a;为什么使用 TensorBoard 调控模型 使用 TensorBoard 可以帮我们&#xff1a; 实时查看 loss / acc 曲线 → 判断是否过拟合、欠拟合&#xff1b; 对比不同模型或超参数的效果&#xff1b; 可视化模型结构 → 帮助调试模型设计&#xff1b; 查看权重/梯…...

丝杆升降机限位失灵深度剖析:从故障机理到智能监测方案

在工业自动化与精密机械传动领域&#xff0c;丝杆升降机凭借高精度、大推力的特性&#xff0c;成为产线设备的核心执行部件。然而&#xff0c;限位系统的可靠性直接决定设备安全运行与生产连续性。本文将从技术原理、故障诊断到智能监测方案&#xff0c;系统性解析丝杆升降机限…...