当前位置: 首页 > news >正文

Seata源码—6.Seata AT模式的数据源代理一

大纲

1.Seata的Resource资源接口源码

2.Seata数据源连接池代理的实现源码

3.Client向Server发起注册RM的源码

4.Client向Server注册RM时的交互源码

5.数据源连接代理与SQL句柄代理的初始化源码

6.Seata基于SQL句柄代理执行SQL的源码

7.执行SQL语句前取消自动提交事务的源码

8.执行SQL语句前后构建数据镜像的源码

9.构建全局锁的key和UndoLog数据的源码

10.Seata Client发起分支事务注册的源码

11.Seata Server处理分支事务注册请求的源码

12.将UndoLog写入到数据库与提交事务的源码

13.通过全局锁重试策略组件执行事务的提交

14.注册分支事务时获取全局锁的入口源码

15.Seata Server获取全局锁的具体逻辑源码

16.全局锁和分支事务及本地事务总结

17.提交全局事务以及提交各分支事务的源码

18.全局事务回滚的过程源码

1.Seata的Resource资源接口源码

数据源代理DataSourceProxy不仅实现了Seata的Resource资源接口,同时还继承了实现了SeataDataSourceProxy接口的抽象类AbstractDataSourceProxy。

由于SeataDataSourceProxy接口又继承自JDK提供的DataSource接口,所以通过数据源连接池DataSource接口的方法,可以获取数据源的连接。

注意:这里的数据源==数据库。

public class DataSourceProxy extends AbstractDataSourceProxy implements Resource {...
}public abstract class AbstractDataSourceProxy implements SeataDataSourceProxy {...
}public interface SeataDataSourceProxy extends DataSource {...
}public interface DataSource extends CommonDataSource, Wrapper {//获取数据源连接Connection getConnection() throws SQLException;Connection getConnection(String username, String password) throws SQLException;
}

Seata的Resource资源接口有三个方法:

一.getResourceGroupId()方法用来获取资源分组

比如主从节点同属一个分组。

二.getResourceId()方法用来获取数据源ID

比如数据源连接URL可作为数据源ID。

三.getBranchType()方法用来获取分支事务类型

比如类型有:AT、TCC、SAGA、XA。

//Resource that can be managed by Resource Manager and involved into global transaction.
//资源是由RM资源管理组件来负责管理的
//RM资源管理器组件会负责把一个个的资源纳入到全局事务里去
//比如RM可以管理数据库资源,把一个数据库本地事务纳入到全局事务里去
public interface Resource {//Get the resource group id.//e.g. master and slave data-source should be with the same resource group id.//获取到资源分组ID//主从架构的数据源关联到同一个资源分组ID//比如MySQL部署了主从架构,主节点和从节点是两个数据源,但是关联到一个分组IDString getResourceGroupId();//Get the resource id.//e.g. url of a data-source could be the id of the db data-source resource.//比如数据源连接URL可以作为数据源的IDString getResourceId();//get resource type, AT, TCC, SAGA and XA//branchType表示分支事务类型:AT、TCC、SAGA、XABranchType getBranchType();
}

2.Seata数据源连接池代理的实现源码

(1)Seata的数据源连接池代理接口SeataDataSourceProxy

(2)Seata的数据源连接池代理抽象类AbstractDataSourceProxy

(3)Seata的数据源连接池代理DataSourceProxy的变量和初始化

(1)Seata的数据源连接池代理接口SeataDataSourceProxy

SeataDataSourceProxy数据源代理在继承DataSource数据源连接池的基础上,增加了两个方法:一个是获取代理的目标数据源连接池的方法,一个是获取代理的目标数据源连接池对应的分支事务类型的方法。

public interface SeataDataSourceProxy extends DataSource {//Gets target data source. //获取代理的目标数据源连接池DataSource getTargetDataSource();//Gets branch type. //获取代理的目标数据源连接池对应的分支事务类型BranchType getBranchType();
}

(2)Seata的数据源连接池代理抽象类AbstractDataSourceProxy

AbstractDataSourceProxy抽象类的主要工作是封装代理的目标数据源连接池targetDataSource。

//The type Abstract data source proxy.
//AbstractDataSourceProxy主要的工作就是:
//封装了代理的目标数据源连接池targetDataSource
public abstract class AbstractDataSourceProxy implements SeataDataSourceProxy {//The Target data source.//代理目标的连接池,可以通过targetDataSource来获取连接protected DataSource targetDataSource;//Instantiates a new Abstract data source proxy.public AbstractDataSourceProxy(){ }//Instantiates a new Abstract data source proxy.public AbstractDataSourceProxy(DataSource targetDataSource) {this.targetDataSource = targetDataSource;}//Gets target data source.@Overridepublic DataSource getTargetDataSource() {return targetDataSource;}@Overridepublic <T> T unwrap(Class<T> iface) throws SQLException {return targetDataSource.unwrap(iface);}//判断目标连接池targetDataSource是否包装了指定的接口iface@Overridepublic boolean isWrapperFor(Class<?> iface) throws SQLException {return targetDataSource.isWrapperFor(iface);}@Overridepublic PrintWriter getLogWriter() throws SQLException {return targetDataSource.getLogWriter();}@Overridepublic void setLogWriter(PrintWriter out) throws SQLException {targetDataSource.setLogWriter(out);}@Overridepublic void setLoginTimeout(int seconds) throws SQLException {targetDataSource.setLoginTimeout(seconds);}@Overridepublic int getLoginTimeout() throws SQLException {return targetDataSource.getLoginTimeout();}@Overridepublic Logger getParentLogger() throws SQLFeatureNotSupportedException {return targetDataSource.getParentLogger();}
}

(3)Seata的数据源连接池代理DataSourceProxy的变量和初始化

初始化数据源连接池代理DataSourceProxy的具体逻辑是:首先从目标数据库连接池dataSource中获取一个数据库连接,然后根据这个数据库连接Connection去初始化jdbcUrl和dbType,接着根据数据库连接地址jdbcUrl初始化resourceId,然后把当前数据库连接池代理DataSourceProxy作为一个资源注册到默认的RM即DefaultResourceManager里去,最后设置RootContext上下文即线程本地变量副本中的分支事务类型。

public class DataSourceProxy extends AbstractDataSourceProxy implements Resource {private static final Logger LOGGER = LoggerFactory.getLogger(DataSourceProxy.class);//默认资源分组IDprivate static final String DEFAULT_RESOURCE_GROUP_ID = "DEFAULT";//Enable the table meta checker,默认是不启用的private static boolean ENABLE_TABLE_META_CHECKER_ENABLE = ConfigurationFactory.getInstance().getBoolean(ConfigurationKeys.CLIENT_TABLE_META_CHECK_ENABLE, DEFAULT_CLIENT_TABLE_META_CHECK_ENABLE);//Table meta checker interval,默认是60sprivate static final long TABLE_META_CHECKER_INTERVAL = ConfigurationFactory.getInstance().getLong(ConfigurationKeys.CLIENT_TABLE_META_CHECKER_INTERVAL, DEFAULT_TABLE_META_CHECKER_INTERVAL);//资源组ID,比如MySQL部署了主从架构,主节点和从节点是两个数据源,但是关联到一个分组IDprivate String resourceGroupId;//代理的目标数据源连接url,这个数据源连接url也可以作为resourceIdprivate String jdbcUrl;//数据源ID,比如数据库连接url就可以作为一个数据源IDprivate String resourceId;//数据源类型private String dbType;//数据源连接用户名private String userName;//定时调度的线程池,定时检查表里的元数据private final ScheduledExecutorService tableMetaExecutor = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("tableMetaChecker", 1, true));//Instantiates a new Data source proxy.public DataSourceProxy(DataSource targetDataSource) {this(targetDataSource, DEFAULT_RESOURCE_GROUP_ID);}//Instantiates a new Data source proxy.//@param targetDataSource the target data source//@param resourceGroupId  the resource group idpublic DataSourceProxy(DataSource targetDataSource, String resourceGroupId) {if (targetDataSource instanceof SeataDataSourceProxy) {LOGGER.info("Unwrap the target data source, because the type is: {}", targetDataSource.getClass().getName());targetDataSource = ((SeataDataSourceProxy) targetDataSource).getTargetDataSource();}this.targetDataSource = targetDataSource;init(targetDataSource, resourceGroupId);}//初始化数据源连接池代理DataSourceProxyprivate void init(DataSource dataSource, String resourceGroupId) {//资源分组IDthis.resourceGroupId = resourceGroupId;//从目标数据库连接池dataSource中,获取一个数据库连接try (Connection connection = dataSource.getConnection()) {//获取数据库连接connection里的元数据的连接urljdbcUrl = connection.getMetaData().getURL();//根据连接url获取到数据库类型dbType = JdbcUtils.getDbType(jdbcUrl);if (JdbcConstants.ORACLE.equals(dbType)) {//如果数据库类型等于oracle,则需要获取数据库连接connection的元数据的用户名userName = connection.getMetaData().getUserName();} else if (JdbcConstants.MARIADB.equals(dbType)) {//如果数据库类型等于mariadb,则需要对数据库类型进行赋值为MySQLdbType = JdbcConstants.MYSQL;}} catch (SQLException e) {throw new IllegalStateException("can not init dataSource", e);}//初始化资源ID,也就是获取数据库连接url来初始化resourceIDinitResourceId();//把当前数据库连接池代理,作为一个资源,注册到默认的RM里,也就是DefaultResourceManagerDefaultResourceManager.get().registerResource(this);if (ENABLE_TABLE_META_CHECKER_ENABLE) {tableMetaExecutor.scheduleAtFixedRate(() -> {try (Connection connection = dataSource.getConnection()) {TableMetaCacheFactory.getTableMetaCache(DataSourceProxy.this.getDbType()).refresh(connection, DataSourceProxy.this.getResourceId());} catch (Exception ignore) {}}, 0, TABLE_META_CHECKER_INTERVAL, TimeUnit.MILLISECONDS);}//Set the default branch type to 'AT' in the RootContext.//设置RootContext上下文,即线程本地变量副本中的分支事务类型RootContext.setDefaultBranchType(this.getBranchType());}private void initResourceId() {if (JdbcConstants.POSTGRESQL.equals(dbType)) {initPGResourceId();} else if (JdbcConstants.ORACLE.equals(dbType) && userName != null) {initDefaultResourceId();resourceId = resourceId + "/" + userName;} else if (JdbcConstants.MYSQL.equals(dbType)) {initMysqlResourceId();} else {initDefaultResourceId();}}private void initMysqlResourceId() {String startsWith = "jdbc:mysql:loadbalance://";if (jdbcUrl.startsWith(startsWith)) {String url;if (jdbcUrl.contains("?")) {url = jdbcUrl.substring(0, jdbcUrl.indexOf('?'));} else {url = jdbcUrl;}resourceId = url.replace(",", "|");} else {initDefaultResourceId();}}...
}

3.Client向Server发起注册RM的源码

初始化数据源连接池代理DataSourceProxy时,会将数据库连接池代理作为资源,注册到DefaultResourceManager资源管理器中。

而初始化DefaultResourceManager时,会通过SPI机制加载所有的ResourceManager。

因此在执行DataSourceProxy的init()方法进行初始化时,由于会调用DefaultResourceManager的registerResource()方法,所以最后会执行到DataSourceManager的registerResource()方法。

在DataSourceManager的registerResource()方法中,首先会把数据源连接池代理DataSourceProxy放入一个Map中进行缓存,然后通过RmNettyRemotingClient构造一个注册RM的请求把数据源连接池代理DataSourceProxy作为资源注册到Seata Server中。

public class DefaultResourceManager implements ResourceManager {//all resource managersprotected static Map<BranchType, ResourceManager> resourceManagers = new ConcurrentHashMap<>();private static class SingletonHolder {private static DefaultResourceManager INSTANCE = new DefaultResourceManager();}//Get resource manager.public static DefaultResourceManager get() {return SingletonHolder.INSTANCE;}private DefaultResourceManager() {initResourceManagers();}protected void initResourceManagers() {//init all resource managers//通过SPI加载所有的ResourceManager资源管理器//比如:DataSourceManager、TCCResourceManager、SagaResourceManager、ResourceManagerXAList<ResourceManager> allResourceManagers = EnhancedServiceLoader.loadAll(ResourceManager.class);if (CollectionUtils.isNotEmpty(allResourceManagers)) {for (ResourceManager rm : allResourceManagers) {resourceManagers.put(rm.getBranchType(), rm);}}}@Overridepublic void registerResource(Resource resource) {getResourceManager(resource.getBranchType()).registerResource(resource);}public ResourceManager getResourceManager(BranchType branchType) {ResourceManager rm = resourceManagers.get(branchType);if (rm == null) {throw new FrameworkException("No ResourceManager for BranchType:" + branchType.name());}return rm;}...
}//The type Data source manager.
//DataSourceManager是AT模式下的资源管理器
public class DataSourceManager extends AbstractResourceManager {//异步化workerprivate final AsyncWorker asyncWorker = new AsyncWorker(this);//RM负责管理的一些resource资源private final Map<String, Resource> dataSourceCache = new ConcurrentHashMap<>();...@Overridepublic void registerResource(Resource resource) {DataSourceProxy dataSourceProxy = (DataSourceProxy) resource;//根据资源ID和数据源代理,把数据源连接池代理DataSourceProxy放入到map里去dataSourceCache.put(dataSourceProxy.getResourceId(), dataSourceProxy);super.registerResource(dataSourceProxy);}...
}public abstract class AbstractResourceManager implements ResourceManager {...@Overridepublic void registerResource(Resource resource) {//通过RmNettyRemotingClient把RM注册到Seata Server中RmNettyRemotingClient.getInstance().registerResource(resource.getResourceGroupId(), resource.getResourceId());}...
}

4.Client向Server注册RM时的交互源码

(1)Client异步发送注册RM的请求给Server

(2)Server收到注册RM的请求后的处理及异步响应

(1)Client异步发送注册RM的请求给Server

public final class RmNettyRemotingClient extends AbstractNettyRemotingClient {...//Register new db key.public void registerResource(String resourceGroupId, String resourceId) {//Resource registration cannot be performed until the RM client is initializedif (StringUtils.isBlank(transactionServiceGroup)) {return;}if (getClientChannelManager().getChannels().isEmpty()) {getClientChannelManager().reconnect(transactionServiceGroup);return;}synchronized (getClientChannelManager().getChannels()) {//向每一个Server发起注册for (Map.Entry<String, Channel> entry : getClientChannelManager().getChannels().entrySet()) {String serverAddress = entry.getKey();Channel rmChannel = entry.getValue();if (LOGGER.isInfoEnabled()) {LOGGER.info("will register resourceId:{}", resourceId);}sendRegisterMessage(serverAddress, rmChannel, resourceId);}}}public void sendRegisterMessage(String serverAddress, Channel channel, String resourceId) {RegisterRMRequest message = new RegisterRMRequest(applicationId, transactionServiceGroup);message.setResourceIds(resourceId);try {//异步发送注册RM的请求super.sendAsyncRequest(channel, message);} catch (FrameworkException e) {if (e.getErrcode() == FrameworkErrorCode.ChannelIsNotWritable && serverAddress != null) {getClientChannelManager().releaseChannel(channel, serverAddress);if (LOGGER.isInfoEnabled()) {LOGGER.info("remove not writable channel:{}", channel);}} else {LOGGER.error("register resource failed, channel:{},resourceId:{}", channel, resourceId, e);}}}...
}public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting implements RemotingClient {...@Overridepublic void sendAsyncRequest(Channel channel, Object msg) {if (channel == null) {LOGGER.warn("sendAsyncRequest nothing, caused by null channel.");return;}RpcMessage rpcMessage = buildRequestMessage(msg, msg instanceof HeartbeatMessage? ProtocolConstants.MSGTYPE_HEARTBEAT_REQUEST: ProtocolConstants.MSGTYPE_RESQUEST_ONEWAY);if (rpcMessage.getBody() instanceof MergeMessage) {mergeMsgMap.put(rpcMessage.getId(), (MergeMessage) rpcMessage.getBody());}super.sendAsync(channel, rpcMessage);}...
}public abstract class AbstractNettyRemoting implements Disposable {...//rpc async request.protected void sendAsync(Channel channel, RpcMessage rpcMessage) {channelWritableCheck(channel, rpcMessage.getBody());if (LOGGER.isDebugEnabled()) {LOGGER.debug("write message:" + rpcMessage.getBody() + ", channel:" + channel + ",active?" + channel.isActive() + ",writable?" + channel.isWritable() + ",isopen?" + channel.isOpen());}doBeforeRpcHooks(ChannelUtil.getAddressFromChannel(channel), rpcMessage);channel.writeAndFlush(rpcMessage).addListener((ChannelFutureListener) future -> {if (!future.isSuccess()) {destroyChannel(future.channel());}});}...
}

(2)Server收到注册RM的请求后的处理及异步响应

public abstract class AbstractNettyRemotingServer extends AbstractNettyRemoting implements RemotingServer {...@ChannelHandler.Sharableclass ServerHandler extends ChannelDuplexHandler {@Overridepublic void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {if (!(msg instanceof RpcMessage)) {return;}//接下来调用processMessage()方法对解码完毕的RpcMessage对象进行处理processMessage(ctx, (RpcMessage) msg);}...}...
}public abstract class AbstractNettyRemoting implements Disposable {...//Rpc message processing.protected void processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {if (LOGGER.isDebugEnabled()) {LOGGER.debug(String.format("%s msgId:%s, body:%s", this, rpcMessage.getId(), rpcMessage.getBody()));}Object body = rpcMessage.getBody();if (body instanceof MessageTypeAware) {MessageTypeAware messageTypeAware = (MessageTypeAware) body;//根据消息类型获取到一个Pair对象,该Pair对象是由请求处理组件和请求处理线程池组成的//processorTable里的内容,是NettyRemotingServer在初始化时,通过调用registerProcessor()方法put进去的final Pair<RemotingProcessor, ExecutorService> pair = this.processorTable.get((int) messageTypeAware.getTypeCode());if (pair != null) {if (pair.getSecond() != null) {try {pair.getSecond().execute(() -> {try {pair.getFirst().process(ctx, rpcMessage);} catch (Throwable th) {LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);} finally {MDC.clear();}});} catch (RejectedExecutionException e) {...}} else {try {pair.getFirst().process(ctx, rpcMessage);} catch (Throwable th) {LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);}}} else {LOGGER.error("This message type [{}] has no processor.", messageTypeAware.getTypeCode());}} else {LOGGER.error("This rpcMessage body[{}] is not MessageTypeAware type.", body);}}...
}public class RegRmProcessor implements RemotingProcessor {...@Overridepublic void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {onRegRmMessage(ctx, rpcMessage);}private void onRegRmMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) {RegisterRMRequest message = (RegisterRMRequest) rpcMessage.getBody();//获取请求的发送地址String ipAndPort = NetUtil.toStringAddress(ctx.channel().remoteAddress());boolean isSuccess = false;String errorInfo = StringUtils.EMPTY;try {if (null == checkAuthHandler || checkAuthHandler.regResourceManagerCheckAuth(message)) {//通过Channel管理组件ChannelManager,注册RM网络连接ChannelManager.registerRMChannel(message, ctx.channel());Version.putChannelVersion(ctx.channel(), message.getVersion());isSuccess = true;if (LOGGER.isDebugEnabled()) {LOGGER.debug("RM checkAuth for client:{},vgroup:{},applicationId:{} is OK", ipAndPort, message.getTransactionServiceGroup(), message.getApplicationId());}} else {if (LOGGER.isWarnEnabled()) {LOGGER.warn("RM checkAuth for client:{},vgroup:{},applicationId:{} is FAIL", ipAndPort, message.getTransactionServiceGroup(), message.getApplicationId());}}} catch (Exception exx) {isSuccess = false;errorInfo = exx.getMessage();LOGGER.error("RM register fail, error message:{}", errorInfo);}RegisterRMResponse response = new RegisterRMResponse(isSuccess);if (StringUtils.isNotEmpty(errorInfo)) {response.setMsg(errorInfo);}//返回响应给客户端remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), response);if (isSuccess && LOGGER.isInfoEnabled()) {LOGGER.info("RM register success,message:{},channel:{},client version:{}", message, ctx.channel(), message.getVersion());}}...
}public class ChannelManager {...public static void registerRMChannel(RegisterRMRequest resourceManagerRequest, Channel channel) throws IncompatibleVersionException {Version.checkVersion(resourceManagerRequest.getVersion());Set<String> dbkeySet = dbKeytoSet(resourceManagerRequest.getResourceIds());RpcContext rpcContext;if (!IDENTIFIED_CHANNELS.containsKey(channel)) {rpcContext = buildChannelHolder(NettyPoolKey.TransactionRole.RMROLE, resourceManagerRequest.getVersion(),resourceManagerRequest.getApplicationId(), resourceManagerRequest.getTransactionServiceGroup(),resourceManagerRequest.getResourceIds(), channel);rpcContext.holdInIdentifiedChannels(IDENTIFIED_CHANNELS);} else {rpcContext = IDENTIFIED_CHANNELS.get(channel);rpcContext.addResources(dbkeySet);}if (dbkeySet == null || dbkeySet.isEmpty()) { return; }for (String resourceId : dbkeySet) {String clientIp;ConcurrentMap<Integer, RpcContext> portMap =CollectionUtils.computeIfAbsent(RM_CHANNELS, resourceId, key -> new ConcurrentHashMap<>()).computeIfAbsent(resourceManagerRequest.getApplicationId(), key -> new ConcurrentHashMap<>()).computeIfAbsent(clientIp = ChannelUtil.getClientIpFromChannel(channel), key -> new ConcurrentHashMap<>());rpcContext.holdInResourceManagerChannels(resourceId, portMap);updateChannelsResource(resourceId, clientIp, resourceManagerRequest.getApplicationId());}}...
}public abstract class AbstractNettyRemotingServer extends AbstractNettyRemoting implements RemotingServer {...@Overridepublic void sendAsyncResponse(RpcMessage rpcMessage, Channel channel, Object msg) {Channel clientChannel = channel;if (!(msg instanceof HeartbeatMessage)) {clientChannel = ChannelManager.getSameClientChannel(channel);}if (clientChannel != null) {RpcMessage rpcMsg = buildResponseMessage(rpcMessage, msg, msg instanceof HeartbeatMessage? ProtocolConstants.MSGTYPE_HEARTBEAT_RESPONSE : ProtocolConstants.MSGTYPE_RESPONSE);super.sendAsync(clientChannel, rpcMsg);} else {throw new RuntimeException("channel is error.");}}...
}public abstract class AbstractNettyRemoting implements Disposable {...//rpc async request.protected void sendAsync(Channel channel, RpcMessage rpcMessage) {channelWritableCheck(channel, rpcMessage.getBody());if (LOGGER.isDebugEnabled()) {LOGGER.debug("write message:" + rpcMessage.getBody() + ", channel:" + channel + ",active?" + channel.isActive() + ",writable?" + channel.isWritable() + ",isopen?" + channel.isOpen());}doBeforeRpcHooks(ChannelUtil.getAddressFromChannel(channel), rpcMessage);channel.writeAndFlush(rpcMessage).addListener((ChannelFutureListener) future -> {if (!future.isSuccess()) {destroyChannel(future.channel());}});}...
}

5.数据源连接代理与SQL句柄代理的初始化源码

(1)数据库操作的三剑客之连接、句柄和结果

(2)数据源连接代理的初始化

(3)数据源连接代理对SQL进行预编译

(4)SQL句柄代理的初始化

(5)SQL句柄代理执行SQL

(1)数据库操作的三剑客之连接、句柄和结果

Seata Client或者Seata Server进行数据库操作的大致流程如下所示:

public class LogStoreDataBaseDAO implements LogStore {//The Log store data source. 数据源连接池protected DataSource logStoreDataSource = null;...@Overridepublic GlobalTransactionDO queryGlobalTransactionDO(long transactionId) {String sql = LogStoreSqlsFactory.getLogStoreSqls(dbType).getQueryGlobalTransactionSQLByTransactionId(globalTable);Connection conn = null;//连接PreparedStatement ps = null;//句柄ResultSet rs = null;//结果try {//1.从数据源连接池中获取数据源连接conn = logStoreDataSource.getConnection();conn.setAutoCommit(true);//2.对sql语句进行预编译ps = conn.prepareStatement(sql);ps.setLong(1, transactionId);//3.执行sql语句rs = ps.executeQuery();if (rs.next()) {return convertGlobalTransactionDO(rs);} else {return null;}} catch (SQLException e) {throw new DataAccessException(e);} finally {IOUtil.close(rs, ps, conn);}}...
}

(2)数据源连接代理的初始化

Seata Client或者Seata Server进行数据库操作时,首先会通过数据库连接池代理DataSourceProxy获取数据库连接,也就是会通过DataSourceProxy的getConnection()方法获取数据源连接代理ConnectionProxy,其中就会根据获取到的一个数据源连接Connection初始化一个数据源连接代理ConnectionProxy。

public class DataSourceProxy extends AbstractDataSourceProxy implements Resource {...@Overridepublic ConnectionProxy getConnection() throws SQLException {//从目标数据源连接池中获取一个数据库连接,然后封装到ConnectionProxy数据源连接代理中,并进行返回Connection targetConnection = targetDataSource.getConnection();return new ConnectionProxy(this, targetConnection);}@Overridepublic ConnectionProxy getConnection(String username, String password) throws SQLException {//从目标数据源连接池中获取一个数据库连接,然后封装到ConnectionProxy数据源连接代理中,并进行返回Connection targetConnection = targetDataSource.getConnection(username, password);return new ConnectionProxy(this, targetConnection);}...
}public class ConnectionProxy extends AbstractConnectionProxy {//Instantiates a new Connection proxy.public ConnectionProxy(DataSourceProxy dataSourceProxy, Connection targetConnection) {super(dataSourceProxy, targetConnection);}...
}public abstract class AbstractConnectionProxy implements Connection {//The Data source proxy. 数据源连接池代理protected DataSourceProxy dataSourceProxy;//The Target connection. 目标数据源连接protected Connection targetConnection;//Instantiates a new Abstract connection proxy.public AbstractConnectionProxy(DataSourceProxy dataSourceProxy, Connection targetConnection) {this.dataSourceProxy = dataSourceProxy;this.targetConnection = targetConnection;}...
}

(3)数据源连接代理对SQL进行预编译

数据源连接代理ConnectionProxy在进行数据库操作时,获取到数据库连接Connection之后,就需要对要执行的SQL进行预编译,也就是会调用AbstractConnectionProxy的prepareStatement()方法。

public abstract class AbstractConnectionProxy implements Connection {...//对SQL进行预编译@Overridepublic PreparedStatement prepareStatement(String sql) throws SQLException {String dbType = getDbType();//support oracle 10.2+PreparedStatement targetPreparedStatement = null;//如果是AT模式if (BranchType.AT == RootContext.getBranchType()) {List<SQLRecognizer> sqlRecognizers = SQLVisitorFactory.get(sql, dbType);if (sqlRecognizers != null && sqlRecognizers.size() == 1) {SQLRecognizer sqlRecognizer = sqlRecognizers.get(0);if (sqlRecognizer != null && sqlRecognizer.getSQLType() == SQLType.INSERT) {TableMeta tableMeta = TableMetaCacheFactory.getTableMetaCache(dbType).getTableMeta(getTargetConnection(),sqlRecognizer.getTableName(), getDataSourceProxy().getResourceId());String[] pkNameArray = new String[tableMeta.getPrimaryKeyOnlyName().size()];tableMeta.getPrimaryKeyOnlyName().toArray(pkNameArray);targetPreparedStatement = getTargetConnection().prepareStatement(sql, pkNameArray);}}}if (targetPreparedStatement == null) {targetPreparedStatement = getTargetConnection().prepareStatement(sql);}//返回一个SQL句柄代理return new PreparedStatementProxy(this, targetPreparedStatement, sql);}...
}

(4)SQL句柄代理的初始化

SQL句柄代理PreparedStatementProxy的初始化主要是设置目标SQL、目标句柄和数据源连接代理。

public class PreparedStatementProxy extends AbstractPreparedStatementProxy implements PreparedStatement, ParametersHolder {//Instantiates a new Prepared statement proxy.public PreparedStatementProxy(AbstractConnectionProxy connectionProxy, PreparedStatement targetStatement, String targetSQL) throws SQLException {super(connectionProxy, targetStatement, targetSQL);}...
}public abstract class AbstractPreparedStatementProxy extends StatementProxy<PreparedStatement> implements PreparedStatement {protected Map<Integer, ArrayList<Object>> parameters;private void initParameterHolder() {this.parameters = new HashMap<>();}//Instantiates a new Abstract prepared statement proxy.public AbstractPreparedStatementProxy(AbstractConnectionProxy connectionProxy, PreparedStatement targetStatement, String targetSQL) throws SQLException {super(connectionProxy, targetStatement, targetSQL);initParameterHolder();}...
}public class StatementProxy<T extends Statement> extends AbstractStatementProxy<T> {//Instantiates a new Statement proxy.public StatementProxy(AbstractConnectionProxy connectionWrapper, T targetStatement, String targetSQL) throws SQLException {super(connectionWrapper, targetStatement, targetSQL);}...
}public abstract class AbstractStatementProxy<T extends Statement> implements Statement {//The Connection proxy.protected AbstractConnectionProxy connectionProxy;//The Target statement.protected T targetStatement;//The Target sql.protected String targetSQL;...//Instantiates a new Abstract statement proxy.public AbstractStatementProxy(AbstractConnectionProxy connectionProxy, T targetStatement, String targetSQL) throws SQLException {this.connectionProxy = connectionProxy;this.targetStatement = targetStatement;this.targetSQL = targetSQL;}...
}

(5)SQL句柄代理执行SQL

从数据源连接池中获取到数据源连接,以及对SQL语句进行预编译后,就可以调用SQL句柄代理PreparedStatementProxy的executeQuery()等方法执行SQL语句。

6.Seata基于SQL句柄代理执行SQL的源码

(1)Spring的JdbcTemplate操作数据库的三剑客

(2)基于SQL句柄代理执行SQL的流程

(1)Spring的JdbcTemplate操作数据库的三剑客

连接、句柄和结果。

@Disabled
public class LocalTransactionWithGlobalLockDataSourceBasicTest {private static ClassPathXmlApplicationContext context;private static JdbcTemplate jdbcTemplate;@BeforeAllpublic static void before() {context = new ClassPathXmlApplicationContext("basic-test-context.xml");jdbcTemplate = (JdbcTemplate) context.getBean("jdbcTemplate");}@Testpublic void testInsert() {RootContext.bindGlobalLockFlag();jdbcTemplate.update("insert into user0 (id, name, gmt) values (?, ?, ?)", new Object[]{2, "xxx", new Date()});}...
}public class JdbcTemplate extends JdbcAccessor implements JdbcOperations {...@Overridepublic int update(String sql, @Nullable Object... args) throws DataAccessException {return update(sql, newArgPreparedStatementSetter(args));}@Overridepublic int update(String sql, @Nullable PreparedStatementSetter pss) throws DataAccessException {return update(new SimplePreparedStatementCreator(sql), pss);}protected int update(final PreparedStatementCreator psc, @Nullable final PreparedStatementSetter pss) throws DataAccessException {logger.debug("Executing prepared SQL update");return updateCount(execute(psc, ps -> {try {if (pss != null) {pss.setValues(ps);}//PreparedStatement执行SQLint rows = ps.executeUpdate();if (logger.isTraceEnabled()) {logger.trace("SQL update affected " + rows + " rows");}return rows;} finally {if (pss instanceof ParameterDisposer) {((ParameterDisposer) pss).cleanupParameters();}}}, true));}@Nullableprivate <T> T execute(PreparedStatementCreator psc, PreparedStatementCallback<T> action, boolean closeResources) throws DataAccessException {Assert.notNull(psc, "PreparedStatementCreator must not be null");Assert.notNull(action, "Callback object must not be null");if (logger.isDebugEnabled()) {String sql = getSql(psc);logger.debug("Executing prepared SQL statement" + (sql != null ? " [" + sql + "]" : ""));}//1.获取连接Connection con = DataSourceUtils.getConnection(obtainDataSource());PreparedStatement ps = null;try {//2.创建句柄ps = psc.createPreparedStatement(con);applyStatementSettings(ps);//3.执行SQL的结果T result = action.doInPreparedStatement(ps);handleWarnings(ps);return result;} catch (SQLException ex) {if (psc instanceof ParameterDisposer) {((ParameterDisposer) psc).cleanupParameters();}String sql = getSql(psc);psc = null;JdbcUtils.closeStatement(ps);ps = null;DataSourceUtils.releaseConnection(con, getDataSource());con = null;throw translateException("PreparedStatementCallback", sql, ex);} finally {if (closeResources) {if (psc instanceof ParameterDisposer) {((ParameterDisposer) psc).cleanupParameters();}JdbcUtils.closeStatement(ps);DataSourceUtils.releaseConnection(con, getDataSource());}}}...
}

(2)基于SQL句柄代理执行SQL的流程

SQL句柄代理PreparedStatementProxy在调用execute()方法执行SQL时,就会调用到ExecuteTemplate执行模版的execute()方法。

而ExecuteTemplate执行模版的execute()方法,如果发现不需要全局锁 + 没有开启全局事务,那么就普通执行本地事务。否则,最终就会调用到BaseTransactionalExecutor的excute()方法。

在BaseTransactionalExecutor的excute()方法中,首先会从线程本地变量副本中获取xid,然后再执行SQL。

public class PreparedStatementProxy extends AbstractPreparedStatementProxy implements PreparedStatement, ParametersHolder {...@Overridepublic boolean execute() throws SQLException {return ExecuteTemplate.execute(this, (statement, args) -> statement.execute());}@Overridepublic ResultSet executeQuery() throws SQLException {return ExecuteTemplate.execute(this, (statement, args) -> statement.executeQuery());}@Overridepublic int executeUpdate() throws SQLException {return ExecuteTemplate.execute(this, (statement, args) -> statement.executeUpdate());}...
}public class ExecuteTemplate {...public static <T, S extends Statement> T execute(StatementProxy<S> statementProxy, StatementCallback<T, S> statementCallback, Object... args) throws SQLException {return execute(null, statementProxy, statementCallback, args);}public static <T, S extends Statement> T execute(List<SQLRecognizer> sqlRecognizers, StatementProxy<S> statementProxy, StatementCallback<T, S> statementCallback, Object... args) throws SQLException {//如果发现不需要全局锁,而且没有开启AT模式下的全局事务,那么就普通执行本地事务if (!RootContext.requireGlobalLock() && BranchType.AT != RootContext.getBranchType()) {//Just work as original statementreturn statementCallback.execute(statementProxy.getTargetStatement(), args);}//获取到DB的类型String dbType = statementProxy.getConnectionProxy().getDbType();if (CollectionUtils.isEmpty(sqlRecognizers)) {sqlRecognizers = SQLVisitorFactory.get(statementProxy.getTargetSQL(), dbType);}Executor<T> executor;if (CollectionUtils.isEmpty(sqlRecognizers)) {executor = new PlainExecutor<>(statementProxy, statementCallback);} else {if (sqlRecognizers.size() == 1) {SQLRecognizer sqlRecognizer = sqlRecognizers.get(0);switch (sqlRecognizer.getSQLType()) {case INSERT://通过SPI机制加载InsertExecutorexecutor = EnhancedServiceLoader.load(InsertExecutor.class, dbType, new Class[]{StatementProxy.class, StatementCallback.class, SQLRecognizer.class}, new Object[]{statementProxy, statementCallback, sqlRecognizer});break;case UPDATE:executor = new UpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);break;case DELETE:executor = new DeleteExecutor<>(statementProxy, statementCallback, sqlRecognizer);break;case SELECT_FOR_UPDATE:executor = new SelectForUpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);break;case INSERT_ON_DUPLICATE_UPDATE:switch (dbType) {case JdbcConstants.MYSQL:case JdbcConstants.MARIADB:executor = new MySQLInsertOrUpdateExecutor(statementProxy, statementCallback, sqlRecognizer);break;default:throw new NotSupportYetException(dbType + " not support to INSERT_ON_DUPLICATE_UPDATE");}break;default:executor = new PlainExecutor<>(statementProxy, statementCallback);break;}} else {executor = new MultiExecutor<>(statementProxy, statementCallback, sqlRecognizers);}}T rs;try {//比如下面最终会调用BaseTransactionalExecutor.excute()方法rs = executor.execute(args);} catch (Throwable ex) {if (!(ex instanceof SQLException)) {// Turn other exception into SQLExceptionex = new SQLException(ex);}throw (SQLException) ex;}return rs;}...
}@LoadLevel(name = JdbcConstants.MYSQL, scope = Scope.PROTOTYPE)
public class MySQLInsertExecutor extends BaseInsertExecutor implements Defaultable {...//Instantiates a new Abstract dml base executor.public MySQLInsertExecutor(StatementProxy statementProxy, StatementCallback statementCallback, SQLRecognizer sqlRecognizer) {super(statementProxy, statementCallback, sqlRecognizer);}...
}public abstract class BaseInsertExecutor<T, S extends Statement> extends AbstractDMLBaseExecutor<T, S> implements InsertExecutor<T> {...public BaseInsertExecutor(StatementProxy<S> statementProxy, StatementCallback<T, S> statementCallback, SQLRecognizer sqlRecognizer) {super(statementProxy, statementCallback, sqlRecognizer);}...
}public abstract class AbstractDMLBaseExecutor<T, S extends Statement> extends BaseTransactionalExecutor<T, S> {...public AbstractDMLBaseExecutor(StatementProxy<S> statementProxy, StatementCallback<T, S> statementCallback, SQLRecognizer sqlRecognizer) {super(statementProxy, statementCallback, sqlRecognizer);}@Overridepublic T doExecute(Object... args) throws Throwable {AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy();//判断是否是自动提交本地事务,默认情况本地事务都是自动提交的,此时需要阻止自动提交if (connectionProxy.getAutoCommit()) {return executeAutoCommitTrue(args);} else {return executeAutoCommitFalse(args);}}...
}public abstract class BaseTransactionalExecutor<T, S extends Statement> implements Executor<T> {//The Statement proxy.protected StatementProxy<S> statementProxy;//The Statement callback.protected StatementCallback<T, S> statementCallback;//The Sql recognizer.protected SQLRecognizer sqlRecognizer;...public BaseTransactionalExecutor(StatementProxy<S> statementProxy, StatementCallback<T, S> statementCallback,SQLRecognizer sqlRecognizer) {this.statementProxy = statementProxy;this.statementCallback = statementCallback;this.sqlRecognizer = sqlRecognizer;}...@Overridepublic T execute(Object... args) throws Throwable {//获取xidString xid = RootContext.getXID();if (xid != null) {statementProxy.getConnectionProxy().bind(xid);}statementProxy.getConnectionProxy().setGlobalLockRequire(RootContext.requireGlobalLock());return doExecute(args);}//Do execute object.protected abstract T doExecute(Object... args) throws Throwable;...
}

7.执行SQL语句前取消自动提交事务的源码

执行ExecuteTemplate执行模版的execute()方法时,最终会调用到BaseTransactionalExecutor基础事务执行器的excute()方法。

执行BaseTransactionalExecutor的execute()方法时,又会执行到AbstractDMLBaseExecutor的doExecute()方法。该方法会判断目标数据库连接是否会自动提交本地事务,默认情况下本地事务都是自动提交的。如果是,则取消自动提交本地事务。

public abstract class BaseTransactionalExecutor<T, S extends Statement> implements Executor<T> {//The Statement proxy.protected StatementProxy<S> statementProxy;//The Statement callback.protected StatementCallback<T, S> statementCallback;//The Sql recognizer.protected SQLRecognizer sqlRecognizer;...public BaseTransactionalExecutor(StatementProxy<S> statementProxy, StatementCallback<T, S> statementCallback,SQLRecognizer sqlRecognizer) {this.statementProxy = statementProxy;this.statementCallback = statementCallback;this.sqlRecognizer = sqlRecognizer;}...@Overridepublic T execute(Object... args) throws Throwable {//获取xidString xid = RootContext.getXID();if (xid != null) {statementProxy.getConnectionProxy().bind(xid);}statementProxy.getConnectionProxy().setGlobalLockRequire(RootContext.requireGlobalLock());return doExecute(args);}//Do execute object.protected abstract T doExecute(Object... args) throws Throwable;...
}public abstract class AbstractDMLBaseExecutor<T, S extends Statement> extends BaseTransactionalExecutor<T, S> {...public AbstractDMLBaseExecutor(StatementProxy<S> statementProxy, StatementCallback<T, S> statementCallback, SQLRecognizer sqlRecognizer) {super(statementProxy, statementCallback, sqlRecognizer);}@Overridepublic T doExecute(Object... args) throws Throwable {AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy();//判断是否是自动提交本地事务,默认情况本地事务都是自动提交的,此时需要阻止自动提交if (connectionProxy.getAutoCommit()) {return executeAutoCommitTrue(args);} else {return executeAutoCommitFalse(args);}}...
}public abstract class AbstractConnectionProxy implements Connection {...@Overridepublic boolean getAutoCommit() throws SQLException {//判断目标数据库连接是否是自动提交,默认情况是都是自动提交的return targetConnection.getAutoCommit();}...
}public abstract class AbstractDMLBaseExecutor<T, S extends Statement> extends BaseTransactionalExecutor<T, S> {...//Execute auto commit true t.protected T executeAutoCommitTrue(Object[] args) throws Throwable {ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();try {//修改自动提交事务的设置,此时需要阻止自动提交事务connectionProxy.changeAutoCommit();return new LockRetryPolicy(connectionProxy).execute(() -> {T result = executeAutoCommitFalse(args);//执行SQL语句connectionProxy.commit();//手动提交本地事务return result;});} catch (Exception e) {//when exception occur in finally,this exception will lost, so just print it hereLOGGER.error("execute executeAutoCommitTrue error:{}", e.getMessage(), e);if (!LockRetryPolicy.isLockRetryPolicyBranchRollbackOnConflict()) {connectionProxy.getTargetConnection().rollback();}throw e;} finally {connectionProxy.getContext().reset();connectionProxy.setAutoCommit(true);}}...
}public class ConnectionProxy extends AbstractConnectionProxy {private final ConnectionContext context = new ConnectionContext();...//change connection autoCommit to false by seatapublic void changeAutoCommit() throws SQLException {getContext().setAutoCommitChanged(true);setAutoCommit(false);}//Gets context.public ConnectionContext getContext() {return context;}@Overridepublic void setAutoCommit(boolean autoCommit) throws SQLException {if ((context.inGlobalTransaction() || context.isGlobalLockRequire()) && autoCommit && !getAutoCommit()) {//change autocommit from false to true, we should commit() first according to JDBC spec.doCommit();}//把目标数据源连接的自动提交事务设置为falsetargetConnection.setAutoCommit(autoCommit);}...
}

8.执行SQL语句前后构建数据镜像的源码

(1)AbstractDMLBaseExecutor的doExecute()方法的执行流程

(2)以UpdateExecuto为例构建前后镜像

(1)AbstractDMLBaseExecutor的doExecute()方法的执行流程

一.首先设置数据源连接阻止其自动提交事务

二.根据目标SQL语句构建beforeImage前镜像

三.执行目标SQL语句(但还没提交其对应的事务)

四.根据beforeImage前镜像构建afterImage后镜像

五.根据前镜像和后镜像构建UndoLog数据

六.手动提交数据源连接代理的事务

public abstract class AbstractDMLBaseExecutor<T, S extends Statement> extends BaseTransactionalExecutor<T, S> {...//Execute auto commit true t.protected T executeAutoCommitTrue(Object[] args) throws Throwable {ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();try {//修改数据源连接的自动提交事务的设置,此时需要阻止自动提交事务connectionProxy.changeAutoCommit();return new LockRetryPolicy(connectionProxy).execute(() -> {T result = executeAutoCommitFalse(args);//执行SQL语句connectionProxy.commit();//手动提交本地事务return result;});} catch (Exception e) {// when exception occur in finally,this exception will lost, so just print it hereLOGGER.error("execute executeAutoCommitTrue error:{}", e.getMessage(), e);if (!LockRetryPolicy.isLockRetryPolicyBranchRollbackOnConflict()) {connectionProxy.getTargetConnection().rollback();}throw e;} finally {connectionProxy.getContext().reset();connectionProxy.setAutoCommit(true);}}//Execute auto commit false t.protected T executeAutoCommitFalse(Object[] args) throws Exception {if (!JdbcConstants.MYSQL.equalsIgnoreCase(getDbType()) && isMultiPk()) {throw new NotSupportYetException("multi pk only support mysql!");}//根据目标SQL语句构建beforeImage,表示目标SQL执行前的数据镜像TableRecords beforeImage = beforeImage();//接下来真正去执行这条SQL语句,但是此时本地事务还不会提交T result = statementCallback.execute(statementProxy.getTargetStatement(), args);int updateCount = statementProxy.getUpdateCount();if (updateCount > 0) {//根据beforeImage构建afterImage,表示目标SQL执行后的数据镜像TableRecords afterImage = afterImage(beforeImage);//根据beforeImage和afterImage准备undoLog数据到数据源连接代理中prepareUndoLog(beforeImage, afterImage);}return result;}...
}

(2)以UpdateExecutor为例构建前后镜像

public class TableRecords implements java.io.Serializable {//表的元数据private transient TableMeta tableMeta;//表的名称private String tableName;//表的多行数据private List<Row> rows = new ArrayList<Row>();...
}public class UpdateExecutor<T, S extends Statement> extends AbstractDMLBaseExecutor<T, S> {private static final Configuration CONFIG = ConfigurationFactory.getInstance();private static final boolean ONLY_CARE_UPDATE_COLUMNS = CONFIG.getBoolean(ConfigurationKeys.TRANSACTION_UNDO_ONLY_CARE_UPDATE_COLUMNS, DefaultValues.DEFAULT_ONLY_CARE_UPDATE_COLUMNS);//Instantiates a new Update executor.public UpdateExecutor(StatementProxy<S> statementProxy, StatementCallback<T, S> statementCallback, SQLRecognizer sqlRecognizer) {super(statementProxy, statementCallback, sqlRecognizer);}@Overrideprotected TableRecords beforeImage() throws SQLException {ArrayList<List<Object>> paramAppenderList = new ArrayList<>();TableMeta tmeta = getTableMeta();//根据主键ID值拼接一个SQL语句,查询这条数据更新前的镜像String selectSQL = buildBeforeImageSQL(tmeta, paramAppenderList);return buildTableRecords(tmeta, selectSQL, paramAppenderList);}private String buildBeforeImageSQL(TableMeta tableMeta, ArrayList<List<Object>> paramAppenderList) {SQLUpdateRecognizer recognizer = (SQLUpdateRecognizer) sqlRecognizer;List<String> updateColumns = recognizer.getUpdateColumns();StringBuilder prefix = new StringBuilder("SELECT ");StringBuilder suffix = new StringBuilder(" FROM ").append(getFromTableInSQL());String whereCondition = buildWhereCondition(recognizer, paramAppenderList);String orderByCondition = buildOrderCondition(recognizer, paramAppenderList);String limitCondition = buildLimitCondition(recognizer, paramAppenderList);if (StringUtils.isNotBlank(whereCondition)) {suffix.append(WHERE).append(whereCondition);}if (StringUtils.isNotBlank(orderByCondition)) {suffix.append(" ").append(orderByCondition);}if (StringUtils.isNotBlank(limitCondition)) {suffix.append(" ").append(limitCondition);}suffix.append(" FOR UPDATE");StringJoiner selectSQLJoin = new StringJoiner(", ", prefix.toString(), suffix.toString());if (ONLY_CARE_UPDATE_COLUMNS) {if (!containsPK(updateColumns)) {selectSQLJoin.add(getColumnNamesInSQL(tableMeta.getEscapePkNameList(getDbType())));}for (String columnName : updateColumns) {selectSQLJoin.add(columnName);}//The on update xxx columns will be auto update by db, so it's also the actually updated columnsList<String> onUpdateColumns = tableMeta.getOnUpdateColumnsOnlyName();onUpdateColumns.removeAll(updateColumns);for (String onUpdateColumn : onUpdateColumns) {selectSQLJoin.add(ColumnUtils.addEscape(onUpdateColumn, getDbType()));}} else {for (String columnName : tableMeta.getAllColumns().keySet()) {selectSQLJoin.add(ColumnUtils.addEscape(columnName, getDbType()));}}return selectSQLJoin.toString();}@Overrideprotected TableRecords afterImage(TableRecords beforeImage) throws SQLException {TableMeta tmeta = getTableMeta();if (beforeImage == null || beforeImage.size() == 0) {return TableRecords.empty(getTableMeta());}String selectSQL = buildAfterImageSQL(tmeta, beforeImage);ResultSet rs = null;try (PreparedStatement pst = statementProxy.getConnection().prepareStatement(selectSQL)) {SqlGenerateUtils.setParamForPk(beforeImage.pkRows(), getTableMeta().getPrimaryKeyOnlyName(), pst);rs = pst.executeQuery();return TableRecords.buildRecords(tmeta, rs);} finally {IOUtil.close(rs);}}private String buildAfterImageSQL(TableMeta tableMeta, TableRecords beforeImage) throws SQLException {StringBuilder prefix = new StringBuilder("SELECT ");String whereSql = SqlGenerateUtils.buildWhereConditionByPKs(tableMeta.getPrimaryKeyOnlyName(), beforeImage.pkRows().size(), getDbType());String suffix = " FROM " + getFromTableInSQL() + " WHERE " + whereSql;StringJoiner selectSQLJoiner = new StringJoiner(", ", prefix.toString(), suffix);if (ONLY_CARE_UPDATE_COLUMNS) {SQLUpdateRecognizer recognizer = (SQLUpdateRecognizer) sqlRecognizer;List<String> updateColumns = recognizer.getUpdateColumns();if (!containsPK(updateColumns)) {selectSQLJoiner.add(getColumnNamesInSQL(tableMeta.getEscapePkNameList(getDbType())));}for (String columnName : updateColumns) {selectSQLJoiner.add(columnName);}//The on update xxx columns will be auto update by db, so it's also the actually updated columnsList<String> onUpdateColumns = tableMeta.getOnUpdateColumnsOnlyName();onUpdateColumns.removeAll(updateColumns);for (String onUpdateColumn : onUpdateColumns) {selectSQLJoiner.add(ColumnUtils.addEscape(onUpdateColumn, getDbType()));}} else {for (String columnName : tableMeta.getAllColumns().keySet()) {selectSQLJoiner.add(ColumnUtils.addEscape(columnName, getDbType()));}}return selectSQLJoiner.toString();}
}

9.构建全局锁的key和UndoLog数据的源码

(1)prepareUndoLog()方法会构建全局锁的key和UndoLog数据

(2)构建全局锁的key的源码

(3)构建UndoLog数据的源码

(1)prepareUndoLog()方法会构建全局锁的key和UndoLog数据

在基础事务执行器BaseTransactionalExecutor的prepareUndoLog()方法中,会构建全局锁的key和构建UndoLog数据,并把它们设置到数据源连接代理ConnectionProxy中。

public abstract class BaseTransactionalExecutor<T, S extends Statement> implements Executor<T> {...//prepare undo log.//@param beforeImage the before image//@param afterImage  the after imageprotected void prepareUndoLog(TableRecords beforeImage, TableRecords afterImage) throws SQLException {if (beforeImage.getRows().isEmpty() && afterImage.getRows().isEmpty()) {return;}if (SQLType.UPDATE == sqlRecognizer.getSQLType()) {if (beforeImage.getRows().size() != afterImage.getRows().size()) {throw new ShouldNeverHappenException("Before image size is not equaled to after image size, probably because you updated the primary keys.");}}ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();TableRecords lockKeyRecords = sqlRecognizer.getSQLType() == SQLType.DELETE ? beforeImage : afterImage;//构建全局锁的key//比如更新了一批数据,那么需要针对这批数据的主键ID,来构建这批数据的全局锁的keyString lockKeys = buildLockKey(lockKeyRecords);if (null != lockKeys) {//将全局锁key设置到数据源连接代理ConnectionProxy中connectionProxy.appendLockKey(lockKeys);//构建UndoLogSQLUndoLog sqlUndoLog = buildUndoItem(beforeImage, afterImage);//将UndoLog设置到数据源连接代理ConnectionProxy中connectionProxy.appendUndoLog(sqlUndoLog);}}...
}

(2)构建全局锁的key的源码

public abstract class BaseTransactionalExecutor<T, S extends Statement> implements Executor<T> {...//build lockKey//@param rowsIncludingPK the records//@return the string as local key. the local key example(multi pk): "t_user:1_a,2_b"protected String buildLockKey(TableRecords rowsIncludingPK) {if (rowsIncludingPK.size() == 0) {return null;}//构建出来的全局锁的key形式为:table_name:id_11001StringBuilder sb = new StringBuilder();sb.append(rowsIncludingPK.getTableMeta().getTableName());sb.append(":");int filedSequence = 0;//pksRows指的是,更新的每一行数据主键字段和主键的值List<Map<String, Field>> pksRows = rowsIncludingPK.pkRows();//获取到主键字段名称,主键可能是联合主键,主键字段的名称可能有多个List<String> primaryKeysOnlyName = getTableMeta().getPrimaryKeyOnlyName();//rowMap就是一行数据,rowMap中的key是字段名称,value是字段值for (Map<String, Field> rowMap : pksRows) {int pkSplitIndex = 0;//遍历和提取这行数据里多个主键字段的名称for (String pkName : primaryKeysOnlyName) {if (pkSplitIndex > 0) {sb.append("_");}//获取到多个主键字段的value,然后拼接在一起sb.append(rowMap.get(pkName).getValue());pkSplitIndex++;}filedSequence++;if (filedSequence < pksRows.size()) {sb.append(",");}}//最终拼成的key形如:table_name:1101_aadd,table_name:xxxx_xxxreturn sb.toString();}...
}

(3)构建UndoLog数据的源码

public abstract class BaseTransactionalExecutor<T, S extends Statement> implements Executor<T> {...//build a SQLUndoLog//@param beforeImage the before image//@param afterImage  the after imageprotected SQLUndoLog buildUndoItem(TableRecords beforeImage, TableRecords afterImage) {SQLType sqlType = sqlRecognizer.getSQLType();String tableName = sqlRecognizer.getTableName();SQLUndoLog sqlUndoLog = new SQLUndoLog();sqlUndoLog.setSqlType(sqlType);//SQL的类型可能为insert、update、deletesqlUndoLog.setTableName(tableName);//表的名称sqlUndoLog.setBeforeImage(beforeImage);//SQL执行前的数据镜像sqlUndoLog.setAfterImage(afterImage);//SQL执行后的数据镜像return sqlUndoLog;}...
}public class SQLUndoLog implements java.io.Serializable {private SQLType sqlType;private String tableName;private TableRecords beforeImage;private TableRecords afterImage;...
}

10.Seata Client发起分支事务注册的源码

(1)ConnectionProxy.commit()提交事务

(2)ConnectionProxy.register()注册分支事务

(1)ConnectionProxy.commit()提交事务

执行数据源连接代理ConnectionProxy的commit()方法提交事务的时候,首先会先调用数据源连接代理ConnectionProxy的register()方法注册分支事务。

public class ConnectionProxy extends AbstractConnectionProxy {private final ConnectionContext context = new ConnectionContext();...@Overridepublic void commit() throws SQLException {try {//通过全局锁重试策略组件来执行本地事务的提交lockRetryPolicy.execute(() -> {doCommit();return null;});} catch (SQLException e) {if (targetConnection != null && !getAutoCommit() && !getContext().isAutoCommitChanged()) {rollback();}throw e;} catch (Exception e) {throw new SQLException(e);}}private void doCommit() throws SQLException {if (context.inGlobalTransaction()) {processGlobalTransactionCommit();} else if (context.isGlobalLockRequire()) {processLocalCommitWithGlobalLocks();} else {targetConnection.commit();}}private void processLocalCommitWithGlobalLocks() throws SQLException {//检查全局锁keyscheckLock(context.buildLockKeys());try {//目标数据源连接提交事务targetConnection.commit();} catch (Throwable ex) {throw new SQLException(ex);}context.reset();}private void processGlobalTransactionCommit() throws SQLException {try {//注册分支事务register();} catch (TransactionException e) {recognizeLockKeyConflictException(e, context.buildLockKeys());}try {UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);//目标数据源连接提交事务targetConnection.commit();} catch (Throwable ex) {LOGGER.error("process connectionProxy commit error: {}", ex.getMessage(), ex);report(false);throw new SQLException(ex);}if (IS_REPORT_SUCCESS_ENABLE) {report(true);}context.reset();}...
}

(2)ConnectionProxy.register()注册分支事务

执行数据源连接代理ConnectionProxy的register()方法注册分支事务的时候,会调用资源管理器DefaultResourceManager的branchRegister()方法,然后会继续调用AbstractResourceManager的branchRegister()方法来注册分支事务。

在AbstractResourceManager的branchRegister()方法中,首先会构造分支事务注册请求,然后通过RmNettyRemotingClient将分支事务注册请求发送给Seata Server。

//The type Connection proxy.
//数据源连接代理
public class ConnectionProxy extends AbstractConnectionProxy {private final ConnectionContext context = new ConnectionContext();...private void register() throws TransactionException {if (!context.hasUndoLog() || !context.hasLockKey()) {return;}//分支事务注册Long branchId = DefaultResourceManager.get().branchRegister(BranchType.AT,//事务类型getDataSourceProxy().getResourceId(),//资源id,资源是已经注册过了的null,context.getXid(),context.getApplicationData(),context.buildLockKeys()//注册分支事物时带上全局锁keys);context.setBranchId(branchId);}...
}public class DefaultResourceManager implements ResourceManager {protected static Map<BranchType, ResourceManager> resourceManagers = new ConcurrentHashMap<>();private static class SingletonHolder {private static DefaultResourceManager INSTANCE = new DefaultResourceManager();}public static DefaultResourceManager get() {return SingletonHolder.INSTANCE;}private DefaultResourceManager() {initResourceManagers();}protected void initResourceManagers() {//通过SPI加载所有的ResourceManager资源管理器//比如:DataSourceManager、TCCResourceManager、SagaResourceManager、ResourceManagerXAList<ResourceManager> allResourceManagers = EnhancedServiceLoader.loadAll(ResourceManager.class);if (CollectionUtils.isNotEmpty(allResourceManagers)) {for (ResourceManager rm : allResourceManagers) {resourceManagers.put(rm.getBranchType(), rm);}}}//注册分支事务@Overridepublic Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid, String applicationData, String lockKeys) throws TransactionException {return getResourceManager(branchType).branchRegister(branchType, resourceId, clientId, xid, applicationData, lockKeys);}public ResourceManager getResourceManager(BranchType branchType) {ResourceManager rm = resourceManagers.get(branchType);if (rm == null) {throw new FrameworkException("No ResourceManager for BranchType:" + branchType.name());}return rm;}...
}public abstract class AbstractResourceManager implements ResourceManager {...@Overridepublic Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid, String applicationData, String lockKeys) throws TransactionException {try {BranchRegisterRequest request = new BranchRegisterRequest();request.setXid(xid);//xid是全局事务idrequest.setLockKey(lockKeys);//这次分支事务要更新数据全局锁keyrequest.setResourceId(resourceId);//分支事务对应的资源idrequest.setBranchType(branchType);//分支事务类型request.setApplicationData(applicationData);//应用数据BranchRegisterResponse response = (BranchRegisterResponse) RmNettyRemotingClient.getInstance().sendSyncRequest(request);if (response.getResultCode() == ResultCode.Failed) {throw new RmTransactionException(response.getTransactionExceptionCode(), String.format("Response[ %s ]", response.getMsg()));}return response.getBranchId();} catch (TimeoutException toe) {throw new RmTransactionException(TransactionExceptionCode.IO, "RPC Timeout", toe);} catch (RuntimeException rex) {throw new RmTransactionException(TransactionExceptionCode.BranchRegisterFailed, "Runtime", rex);}}...
}

相关文章:

Seata源码—6.Seata AT模式的数据源代理一

大纲 1.Seata的Resource资源接口源码 2.Seata数据源连接池代理的实现源码 3.Client向Server发起注册RM的源码 4.Client向Server注册RM时的交互源码 5.数据源连接代理与SQL句柄代理的初始化源码 6.Seata基于SQL句柄代理执行SQL的源码 7.执行SQL语句前取消自动提交事务的源…...

Spring-Beans的生命周期的介绍

目录 1、Spring核心组件 2、Bean组件 2.1、Bean的定义 2.2、Bean的生命周期 1、实例化 2、属性填充 3、初始化 4、销毁 2.3、Bean的执行时间 2.4、Bean的作用域 3、常见问题解决方案 4、与Java对象区别 前言 关于bean的生命周期&#xff0c;如下所示&#xff1a; …...

目标检测新突破:用MSBlock打造更强YOLOv8

文章目录 YOLOv8的现状与挑战YOLO-MS的MSBlock简介MSBlock的工作原理MSBlock的优势 利用MSBlock改进YOLOv8替换YOLOv8主干网络中的部分模块代码实现&#xff1a;替换CSP模块为MSBlock 在YOLOv8的颈部&#xff08;Neck&#xff09;中插入MSBlock代码实现&#xff1a;在颈部区域插…...

[SpringBoot]Spring MVC(4.0)

获取Header 传统获取 header 从 HttpServletRequest 中获取 RequestMapping("/r8")public String r8(HttpServletRequest request) {String userAgent request.getHeader("User-Agent");return "userAgent: "userAgent;}使用浏览器访问后&…...

Linux概述:从内核到开源生态

Linux概述&#xff1a;从内核到开源生态 Linux 是当今计算机领域最核心的开源操作系统内核&#xff0c;其影响力已渗透到服务器、嵌入式设备、云计算甚至超级计算机等各个领域。本章将深入解析Linux的本质、核心架构及其背后的开源哲学。 1. Linux的本质&#xff1a;不只是“操…...

【ubuntu24.04】pycharm 死机结束进程

windows 远程pycharm到ubuntu执行程序 pycharm 在调试过程中&#xff0c;内存耗尽&#xff0c;然后死机了 pycharm 进程 (base) rootk8s-master-pfsrv:/home/zhangbin/下载# ps -ef | grep pycharm root 121245 3230568 0 5月14 pts/8 00:00:00 /bin/bash --rcfile …...

【PRB】深度解析GaN中最浅的受主缺陷

2025 年 1 月 16 日,Virginia Commonwealth University 的 M. A. Reshchikov 和 SUNY–Albany 的 B. McEwen 等人在《Physical Review B》期刊发表了题为《Identity of the shallowest acceptor in GaN》的文章,基于对 50 多个 Be 掺杂 GaN 样品的光致发光实验以及 Heyd-Scus…...

Flink CEP是什么?

Apache Flink 的 CEP&#xff08;Complex Event Processing&#xff0c;复杂事件处理&#xff09; 是 Flink 提供的一个库&#xff0c;用于在无界数据流中检测符合特定模式的事件组合。 &#x1f3af; 一、什么是 CEP&#xff1f; ✅ 定义&#xff1a; CEP 是一种从连续的数据…...

基于STM32的多传感器融合的设施农业小型搬运机器人避障控制系统设计

一、系统总体设计目标 针对设施农业场景中狭窄通道、障碍物多样(如农机具、作物植株、水管)的特点,设计一款基于 STM32 的小型搬运机器人避障控制系统。系统通过多传感器融合实现 360 环境感知,采用模糊 PID 控制算法实现平滑避障,满足温室、大棚等场景的搬运需求。 二、…...

从零开始实现大语言模型(十六):加载开源大语言模型参数

1. 前言 预训练大语言模型的难点不在于算法&#xff0c;而在于数据和算力&#xff0c;绝大多数企业和机构都没有预训练大语言模型的算力资源。在工业界的大语言模型应用实践中&#xff0c;通常会使用领域数据微调开源大语言模型参数&#xff0c;以构建领域大语言模型。 本文介…...

Spark,数据提取和保存

以下是使用 Spark 进行数据提取&#xff08;读取&#xff09;和保存&#xff08;写入&#xff09;的常见场景及代码示例&#xff08;基于 Scala/Java/Python&#xff0c;不含图片操作&#xff09;&#xff1a; 一、数据提取&#xff08;读取&#xff09; 1. 读取文件数据&a…...

java19

1.集合体系结构 注意&#xff1a; 2.collection遍历之迭代器遍历 一次循环只能一次next方法的原因&#xff1a; 原因&#xff1a;集合长度是单数就报错 3.collection遍历之增强for遍历 如何代码简写呢&#xff1a;集合名.for回车 4.collection遍历之Lambda表达式遍历 5.使用多态…...

从0到1吃透卷积神经网络(CNN):原理与实战全解析

一、开篇&#xff1a;CNN 在 AI 领域的地位 在当今人工智能&#xff08;AI&#xff09;飞速发展的时代&#xff0c;卷积神经网络&#xff08;Convolutional Neural Network&#xff0c;简称 CNN&#xff09;无疑是深度学习领域中最为耀眼的明星之一 。它就像是 AI 世界里的超级…...

建一个结合双向长短期记忆网络(BiLSTM)和条件随机场(CRF)的模型

构建一个结合双向长短期记忆网络&#xff08;BiLSTM&#xff09;和条件随机场&#xff08;CRF&#xff09;的模型&#xff0c;通常用于序列标注任务&#xff0c;如命名实体识别&#xff08;NER&#xff09;、词性标注&#xff08;POS Tagging&#xff09;等。下面我将通过口述的…...

mvc-ioc实现

IOC 1&#xff09;耦合/依赖 依赖&#xff0c;是谁离不开谁 就比如上诉的Controller层必须依赖于Service层&#xff0c;Service层依赖于Dao 在软件系统中&#xff0c;层与层之间存在依赖。我们称之为耦合 我们系统架构或者设计的一个原则是&#xff…...

符合Python风格的对象(再谈向量类)

再谈向量类 为了说明用于生成对象表示形式的众多方法&#xff0c;我们将使用一个 Vector2d 类&#xff0c;它与第 1 章中的类似。这一节和接下来的几节会不断实 现这个类。我们期望 Vector2d 实例具有的基本行为如示例 9-1 所示。 示例 9-1 Vector2d 实例有多种表示形式 &g…...

4.1.8文件共享

知识总览 基于索引节点的共享方式(硬链接)&#xff1a; 让不同用户的文件目录项指向同一个文件的索引节点 用户1创建文件1&#xff0c;并让文件目录项aaa指向了文件1&#xff0c;这个文件对应了一个索引节点&#xff0c;这个索引节点 包含了文件的物理地址和文件的其他属性信…...

[LevelDB]LevelDB版本管理的黑魔法-为什么能在不锁表的情况下管理数据?

文章摘要 LevelDB的日志管理系统是怎么通过双链表来进行数据管理为什么LevelDB能够在不锁表的情况下进行日志新增 适用人群: 对版本管理机制有开发诉求&#xff0c;并且希望参考LevelDB的版本开发机制。数据库相关从业者的专业人士。计算机狂热爱好者&#xff0c;对计算机的…...

普通用户的服务器连接与模型部署相关记录

普通用户的服务器连接与模型部署相关记录 一、从登录到使用自己的conda 1.账号登陆&#xff1a; ssh xxx172.31.226.236 2.下载与安装conda&#xff1a; 下载conda&#xff1a; wget -c https://repo.anaconda.com/archive/Anaconda3-2023.03-1-Linux-x86_64.sh 安装con…...

WebSocket解决方案的一些细节阐述

今天我们来看看WebSocket解决方案的一些细节问题&#xff1a; 实际上&#xff0c;集成WebSocket的方法都有相关的工程挑战&#xff0c;这可能会影响项目成本和交付期限。在最简单的层面上&#xff0c;构建 WebSocket 解决方案似乎是添加接收实时更新功能的前进方向。但是&…...

架构思维:构建高并发扣减服务_分布式无主架构

文章目录 Pre无主架构的任务简单实现分布式无主架构 设计和实现扣减中的返还什么是扣减的返还返还实现原则原则一&#xff1a;扣减完成才能返还原则二&#xff1a;一次扣减可以多次返还原则三&#xff1a;返还的总数量要小于等于原始扣减的数量原则四&#xff1a;返还要保证幂等…...

C++函数基础:定义与调用函数,参数传递(值传递、引用传递)详解

1. 引言 函数是C编程中的核心概念之一&#xff0c;它允许我们将代码模块化&#xff0c;提高代码的可读性、复用性和可维护性。本文将深入探讨&#xff1a; 函数的定义与调用参数传递方式&#xff08;值传递 vs 引用传递&#xff09;应用场景与最佳实践 2. 函数的定义与调用 …...

深入解析Python中的Vector2d类:从基础实现到特殊方法的应用

引言 在Python面向对象编程中&#xff0c;特殊方法&#xff08;或称魔术方法&#xff09;是实现对象丰富行为的关键。本文将以Vector2d类为例&#xff0c;详细讲解如何通过特殊方法为自定义类添加多种表示形式和操作能力。 Vector2d类的基本行为 Vector2d类是一个二维向量类…...

【25软考网工】第六章(7)网络安全防护系统

博客主页&#xff1a;christine-rr-CSDN博客 ​​专栏主页&#xff1a;软考中级网络工程师笔记 ​​​ 大家好&#xff0c;我是christine-rr !目前《软考中级网络工程师》专栏已经更新三十多篇文章了&#xff0c;每篇笔记都包含详细的知识点&#xff0c;希望能帮助到你&#x…...

Mac下载bilibili视频

安装 安装 yt-dlp brew install yt-dlp安装FFmpeg 用于合并音视频流、转码等操作 brew install ffmpeg使用 下载单个视频 查看可用格式 yt-dlp -F --cookies-from-browser chrome "https://www.bilibili.com/video/BV15B4y1G7F3?spm_id_from333.788.recommend_more_vid…...

6个月Python学习计划:从入门到AI实战(前端开发者进阶指南)

作者&#xff1a;一名前端开发者的进阶日志 计划时长&#xff1a;6个月 每日学习时间&#xff1a;2小时 覆盖方向&#xff1a;Python基础、爬虫开发、数据分析、后端开发、人工智能、深度学习 &#x1f4cc; 目录 学习目标总览每日时间分配建议第1月&#xff1a;Python基础与编…...

批量处理 Office 文档 高画质提取图片、视频、音频素材助手

各位办公小能手们&#xff01;你们有没有遇到过想从 Office 文档里提取图片、音频和视频&#xff0c;却又搞得焦头烂额的情况&#xff1f;今天就给大家介绍一款超厉害的工具——OfficeImagesExtractor&#xff01; 这货的核心功能那可真是杠杠的&#xff01;首先是高画质提取&a…...

【甲方安全建设】Python 项目静态扫描工具 Bandit 安装使用详细教程

文章目录 一、工具简介二、工具特点1.聚焦安全漏洞检测2.灵活的扫描配置3.多场景适配4.轻量且社区活跃三、安装步骤四、使用方法场景1:扫描单个Python文件场景2:递归扫描整个项目目录五、结果解读六、总结一、工具简介 Bandit 是由Python官方推荐的静态代码分析工具(SAST)…...

【推荐】新准则下对照会计报表172个会计科目解释

序号 科目名称 对应的会计报表项目 序号 科目名称 对应的会计报表项目   一、资产类     二、负债类   1 1001 库存现金 货币资金 103 2001 短期借款 短期借款 2 1002 银行存款 货币资金 104 2101 交易性金融负债 易性金融负债 3 1012 其他货币资…...

IntelliJ IDEA设置编码集

在IntelliJ IDEA中设置Properties文件的编码格式&#xff0c;主要涉及以下步骤和注意事项&#xff1a; 1. ‌全局和项目编码设置‌ 打开设置界面&#xff1a;File -> Settings -> Editor -> File Encodings。在Global Encoding和Project Encoding下拉菜单中均选择UT…...

类魔方 :多变组合,灵活复用

文章目录 一、类的基础1. 类的基本结构与语法1. 类的定义与实例化2. 成员变量&#xff08;属性&#xff09;3. 构造函数&#xff08;Constructor&#xff09;4. 成员方法 2. 访问修饰符1. 基本访问规则2. 子类对父类方法的重写3. 构造函数的访问修饰符4. 参数属性与继承总结 3.…...

支持多方式拼接图片的软件

软件介绍 本文介绍一款名为 PicMerger 的图片拼接软件。 拼接亮点 PicMerger 这款软件最大的亮点在于&#xff0c;它能够将不同分辨率的图片完美地拼接在一起。拼接时会自动以分辨率最小的图片为标准&#xff0c;操作十分方便。 拼接方式与设置 该软件支持横向和纵向的拼接…...

Qt音视频开发过程中一个疑难杂症的解决方法/ffmpeg中采集本地音频设备无法触发超时回调

一、前言 最近在做实时音视频通话的项目中&#xff0c;遇到一个神奇的问题&#xff0c;那就是用ffmpeg采集本地音频设备&#xff0c;当音频设备拔掉后&#xff0c;采集过程会卡死在av_read_frame函数中&#xff0c;尽管设置了超时时间&#xff0c;也设置了超时回调interrupt_c…...

Android studio Could not move temporary workspace

Android studio Could not move temporary workspace 在Window上运行AS出现Could not move temporary workspace报错方法一&#xff08;有效&#xff09;方法二方法三方法四总结 在Window上运行AS出现Could not move temporary workspace报错 Could not move temporary workspa…...

深度估计中为什么需要已知相机基线(known camera baseline)?

在计算机视觉和立体视觉的上下文中&#xff0c;“已知相机基线”&#xff08;known camera baseline&#xff09;的解释 1. 相机基线的定义 相机基线是指两个相机中心之间的距离。在立体视觉系统中&#xff0c;通常有两个相机&#xff08;或一个相机在不同位置拍摄两张图像&a…...

Spring Cloud 技术实战

Spring Cloud 简介 Spring Cloud 是基于 Spring Boot 构建的微服务框架&#xff0c;提供了一套完整的微服务解决方案。它利用 Spring Boot 的开发便利性&#xff0c;并通过各种组件简化分布式系统的开发。 核心组件 Spring Cloud Netflix Eureka: 服务注册与发现Spring Clou…...

《云端共生体:Flutter与AR Cloud如何改写社交交互规则》

当Flutter遇上AR Cloud&#xff0c;一场关于社交应用跨设备增强现实内容共享与协作的变革正在悄然发生。 Flutter是谷歌推出的一款开源UI软件开发工具包&#xff0c;其最大的优势在于能够实现一套代码&#xff0c;多平台部署&#xff0c;涵盖iOS、Android、Web、Windows、macO…...

【数据结构】1-3 算法的时间复杂度

数据结构知识点合集&#xff1a;数据结构与算法 • 知识点 • 时间复杂度的定义 1、算法时间复杂度 事前预估算法时间开销T(n)与问题规模 n 的关系&#xff08;T 表示 “time”&#xff09; 2、语句频度 算法中语句的执行次数 对于以上算法&#xff0c;语句频度&#xff1a;…...

Science Robotics 封面论文:基于形态学开放式参数化的仿人灵巧手设计用于具身操作

人形机械手具有无与伦比的多功能性和精细运动技能&#xff0c;使其能够精确、有力和稳健地执行各种任务。在古生物学记录和动物王国中&#xff0c;我们看到了各种各样的替代手和驱动设计。了解形态学设计空间和由此产生的涌现行为不仅可以帮助我们理解灵巧的作用及其演变&#…...

Vue百日学习计划Day24-28天详细计划-Gemini版

总目标: 在 Day 24-27 熟练掌握 Vue.js 的各种模板语法&#xff0c;包括文本插值、属性绑定、条件渲染、列表渲染、事件处理和表单绑定&#xff0c;并能结合使用修饰符。 所需资源: Vue 3 官方文档 (模板语法): https://cn.vuejs.org/guide/essentials/template-syntax.htmlVu…...

C++_数据结构_哈希表(hash)实现

✨✨ 欢迎大家来到小伞的大讲堂✨✨ &#x1f388;&#x1f388;养成好习惯&#xff0c;先赞后看哦~&#x1f388;&#x1f388; 所属专栏&#xff1a;C学习 小伞的主页&#xff1a;xiaosan_blog 制作不易&#xff01;点个赞吧&#xff01;&#xff01;谢谢喵&#xff01;&…...

elasticsearch kibana ik 各版本下载

https://release.infinilabs.com/analysis-ik/stable/或者 https://github.com/infinilabs/analysis-ik/releases...

Uniapp 与 Uniapp X 对比:新手上手指南及迁移到 Uniapp X 的注意事项

文章目录 前言一、Uniapp 与 Uniapp X 核心区别二、Uniapp X 的核心优势三、新手学习 Uniapp X 必备技能栈3.1 基础技能要求3.2 平台相关知识3.3 工具链掌握 四、从 Uniapp 迁移到 Uniapp X 的注意事项4.1 语法转换&#xff1a;4.2 组件替换&#xff1a;4.3 状态管理&#xff1…...

SQL性能分析

查看数据库操作频次 使用SHOW GLOBAL STATUS LIKE Com_______; 指令&#xff0c;能查看当前数据库的INSERT、UPDATE、DELETE、SELECT访问频次 。若以查询为主&#xff0c;需重点优化查询相关性能&#xff0c;如索引&#xff1b;若以增删改为主&#xff0c;可考虑事务处理、批量…...

CANoe测试应用案例之A2L

写在前面 本系列文章主要讲解CANoe测试应用案例之A2L的相关知识&#xff0c;希望能帮助更多的同学认识和了解CANoe测试。 若有相关问题&#xff0c;欢迎评论沟通&#xff0c;共同进步。(*^▽^*) CANoe Option AMD/XCP支持加载A2L到CANoe中&#xff0c;方便ECU内部变量在功能验…...

H2数据库源码学习+debug, 数据库 sql、数据库引擎、数据库存储从此不再神秘

一、源码结构概览 H2源码采用标准Maven结构&#xff0c;核心模块在src/main/org/h2目录下&#xff1a; ├── command/ # SQL解析与执行 ├── engine/ # 数据库引擎核心&#xff08;会话、事务&#xff09; ├── table/ # 表结构定义与操作 ├── index/ # 索引实现&am…...

PopSQL:一个支持团队协作的SQL开发工具

PopSQL 是一款专为团队协作设计的现代化 SQL 编辑器&#xff0c;通过通团队过协作编写 SQL 查询、交互式可视化以及共享结果提升数据分析和管理效率。 PopSQL 提供了基于 Web 的在线平台以及跨系统&#xff08;Windows、macOS、Linux&#xff09;的桌面应用&#xff0c;包括免费…...

tomcat查看状态页及调优信息

准备工作 先准备一台已经安装好tomcat的虚拟机&#xff0c;tomcat默认是状态页是默认被禁用的 1.添加授权用户 vim /usr/local/tomcat/conf/tomcat-users.xml22 <role rolename"manager-gui"/>23 <user username"admin" password"tomcat&q…...

贝塞尔曲线原理

文章目录 一、 低阶贝塞尔曲线1.一阶贝塞尔曲线2. 二阶贝塞尔曲线3. 三阶贝塞尔曲线 一、 低阶贝塞尔曲线 1.一阶贝塞尔曲线 如下图所示&#xff0c; P 0 ​ P_0​ P0​​, P 1 ​ P_1​ P1​​ 是平面中的两点&#xff0c;则 B ( t ) B ( t ) B(t) 代表平面中的一段线段。…...

【MYSQL】笔记

&#x1f4da; 博主的专栏 &#x1f427; Linux | &#x1f5a5;️ C | &#x1f4ca; 数据结构 | &#x1f4a1;C 算法 | &#x1f152; C 语言 | &#x1f310; 计算机网络 在ubuntu中&#xff0c;改配置文件&#xff1a; sudo nano /etc/mysql/mysql.conf.d/mysq…...