找回密码
 立即注册
首页 业界区 业界 Seata源码—3.全局事务注解扫描器的初始化 ...

Seata源码—3.全局事务注解扫描器的初始化

巴沛若 2025-6-2 23:47:16
大纲
1.全局事务注解扫描器继承的父类与实现的接口
2.全局事务注解扫描器的核心变量
3.Spring容器初始化后初始化Seata客户端的源码
4.TM全局事务管理器客户端初始化的源码
5.TM组件的Netty网络通信客户端初始化源码
6.Seata框架的SPI动态扩展机制源码
7.向Seata客户端注册网络请求处理器的源码
8.Seata客户端的定时调度任务源码
9.Seata客户端初始化Netty Bootstrap的源码
10.Seata客户端的寻址机制与连接服务端的源码
11.RM分支事务资源管理器客户端初始化的源码
12.全局事务注解扫描器扫描Bean是否有Seata注解
13.Seata全局事务拦截器的创建和初始化
14.基于Spring AOP创建全局事务动态代理的源码
15.全局事务注解扫描器的初始化总结
 
如下的代码都是位于seata-spring模块下。
 
1.全局事务注解扫描器继承的父类与实现的接口
在dubbo-business.xml配置文件中,会引入全局事务注解扫描器GlobalTransactionScanner。
  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <beans xmlns="http://www.springframework.org/schema/beans"
  3.        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4.        xmlns:dubbo="http://dubbo.apache.org/schema/dubbo"
  5.        xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd">
  6.     <dubbo:application name="dubbo-demo-app">
  7.         <dubbo:parameter key="qos.enable" value="false"/>
  8.         <dubbo:parameter key="qos.accept.foreign.ip" value="false"/>
  9.         <dubbo:parameter key="qos.port" value="33333"/>
  10.     </dubbo:application>
  11.     <dubbo:registry address="zookeeper://localhost:2181" />
  12.     <dubbo:reference id="orderService" check="false" interface="io.seata.samples.dubbo.service.OrderService"/>
  13.     <dubbo:reference id="stockService" check="false" interface="io.seata.samples.dubbo.service.StockService"/>
  14.     <bean id="business" >
  15.         <property name="orderService" ref="orderService"/>
  16.         <property name="stockService" ref="stockService"/>
  17.     </bean>
  18.    
  19.    
  20.     <bean >
  21.         <constructor-arg value="dubbo-demo-app"/>
  22.         <constructor-arg value="my_test_tx_group"/>
  23.     </bean>
  24. </beans>
复制代码
全局事务注解扫描器GlobalTransactionScanner的继承父类和实现接口:
  1. 继承父类:AbstractAutoProxyCreator——Spring的动态代理自动创建者;
  2. 实现接口:ConfigurationChangeListener——关注配置变更事件的监听器;
  3. 实现接口:InitializingBean——Spring Bean的初始化回调;
  4. 实现接口:ApplicationContextAware——让Spring Bean获取到Spring容器;
  5. 实现接口:DisposableBean——支持可抛弃Bean;
复制代码
  1. //AbstractAutoProxyCreator:Spring的动态代理自动创建者
  2. //ConfigurationChangeListener:关注配置变更事件的监听器
  3. //InitializingBean:Spring Bean初始化回调
  4. //ApplicationContextAware:让Bean可以获取Spring容器
  5. //DisposableBean:支持可抛弃Bean
  6. public class GlobalTransactionScanner extends AbstractAutoProxyCreator
  7.         implements ConfigurationChangeListener, InitializingBean, ApplicationContextAware, DisposableBean {
  8.     ...
  9.     ...
  10. }
复制代码
 
2.全局事务注解扫描器的核心变量
(1)ConfigurationChangeListener接口
(2)InitializingBean接口
(3)ApplicationContextAware接口
(4)DisposableBean接口
(5)GlobalTransactionScanner核心变量
 
(1)ConfigurationChangeListener接口
实现了该接口的Bean,可以处理配置变更的事件。
  1. //实现了该ConfigurationChangeListener接口的Bean:
  2. //在发生配置变更事件时,可以进行相应的处理
  3. public interface ConfigurationChangeListener {
  4.     int CORE_LISTENER_THREAD = 1;
  5.     int MAX_LISTENER_THREAD = 1;
  6.    
  7.     //默认的线程池
  8.     ExecutorService EXECUTOR_SERVICE = new ThreadPoolExecutor(
  9.         CORE_LISTENER_THREAD, MAX_LISTENER_THREAD,
  10.         Integer.MAX_VALUE, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(),
  11.         new NamedThreadFactory("configListenerOperate", MAX_LISTENER_THREAD)
  12.     );
  13.    
  14.     //处理配置变更的事件
  15.     void onChangeEvent(ConfigurationChangeEvent event);
  16.    
  17.     //配置变更事件的默认处理:获取默认的线程池来处理配置变更的事件
  18.     default void onProcessEvent(ConfigurationChangeEvent event) {
  19.         getExecutorService().submit(() -> {
  20.             //处理配置变更事件前的回调
  21.             beforeEvent();
  22.             //进行具体的配置变更事件处理
  23.             onChangeEvent(event);
  24.             //处理配置变更事件后的回调
  25.             afterEvent();
  26.         });
  27.     }
  28.     //关闭线程池
  29.     default void onShutDown() {
  30.         getExecutorService().shutdownNow();
  31.     }
  32.     //获取线程池
  33.     default ExecutorService getExecutorService() {
  34.         return EXECUTOR_SERVICE;
  35.     }
  36.     //处理配置变更事件前的默认回调
  37.     default void beforeEvent() {
  38.     }
  39.     //处理配置变更事件后的默认回调
  40.     default void afterEvent() {
  41.     }
  42. }
复制代码
(2)InitializingBean接口
实现了该接口的Bean,可以在初始化后进行回调。
  1. //实现了该InitializingBean接口的Bean:
  2. //它的所有properties属性被BeanFactory设置之后,
  3. //可以通过afterPropertiesSet()这个回调方法,来处理一些特殊的初始化操作
  4. public interface InitializingBean {
  5.     void afterPropertiesSet() throws Exception;
  6. }
复制代码
(3)ApplicationContextAware接口
实现了该接口的Bean,可以获取Spring容器。
  1. //实现了该ApplicationContextAware接口的Bean:
  2. //可以通过setApplicationContext()方法将Spring容器注入到这个Bean里面
  3. //注入 == set属性,代理 == wrap包装
  4. public interface ApplicationContextAware extends Aware {
  5.     void setApplicationContext(ApplicationContext applicationContext) throws BeansException;
  6. }
复制代码
(4)DisposableBean接口
实现了该接口的Bean,可以在Spring容器被销毁时进行相应的回调处理。
  1. //实现了该DisposableBean接口的Bean:
  2. //当Spring容器被销毁时,可以通过destroy()方法释放资源
  3. public interface DisposableBean {
  4.     void destroy() throws Exception;
  5. }
复制代码
(5)GlobalTransactionScanner核心变量
  1. //AbstractAutoProxyCreator:Spring的动态代理自动创建者
  2. //ConfigurationChangeListener:关注配置变更事件的监听器
  3. //InitializingBean:Spring Bean初始化回调
  4. //ApplicationContextAware:用来获取Spring容器
  5. //DisposableBean:支持可抛弃Bean
  6. public class GlobalTransactionScanner extends AbstractAutoProxyCreator
  7.         implements ConfigurationChangeListener, InitializingBean, ApplicationContextAware, DisposableBean {
  8.     private static final long serialVersionUID = 1L;
  9.     private static final Logger LOGGER = LoggerFactory.getLogger(GlobalTransactionScanner.class);
  10.     private static final int AT_MODE = 1;
  11.     private static final int MT_MODE = 2;
  12.     private static final int ORDER_NUM = 1024;
  13.     private static final int DEFAULT_MODE = AT_MODE + MT_MODE;
  14.     private static final String SPRING_TRANSACTION_INTERCEPTOR_CLASS_NAME = "org.springframework.transaction.interceptor.TransactionInterceptor";
  15.     private static final Set<String> PROXYED_SET = new HashSet<>();
  16.     private static final Set<String> EXCLUDE_BEAN_NAME_SET = new HashSet<>();
  17.     private static final Set<ScannerChecker> SCANNER_CHECKER_SET = new LinkedHashSet<>();
  18.     //Spring容器
  19.     private static ConfigurableListableBeanFactory beanFactory;
  20.     //Spring AOP里对方法进行拦截的拦截器
  21.     private MethodInterceptor interceptor;
  22.     //对添加了@GlobalTransactional注解的方法进行拦截的AOP拦截器
  23.     private MethodInterceptor globalTransactionalInterceptor;
  24.     //应用程序ID,在XML里配置时注入进来的
  25.     private final String applicationId;
  26.     //分布式事务组
  27.     private final String txServiceGroup;
  28.     //分布式事务模式,默认就是AT事务
  29.     private final int mode;
  30.     //与阿里云整合使用的,accessKey和secretKey是进行身份认证和安全访问时需要用到
  31.     private String accessKey;
  32.     private String secretKey;
  33.     //是否禁用全局事务,默认是false
  34.     private volatile boolean disableGlobalTransaction = ConfigurationFactory.getInstance().getBoolean(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, DEFAULT_DISABLE_GLOBAL_TRANSACTION);
  35.     //确保初始化方法仅仅调用一次的CAS变量
  36.     //通过Atomic CAS操作可以确保多线程并发下,方法只被调用一次
  37.     //只有一个线程可以成功对initialized原子变量进行CAS操作
  38.     private final AtomicBoolean initialized = new AtomicBoolean(false);
  39.     //全局事务失败时会有一个handler处理钩子
  40.     //比如当开启全局事务失败、提交全局事务失败、回滚全局事务失败、回滚重试全局事务失败时,都会在FailureHandler有相应的回调入口
  41.     private final FailureHandler failureHandlerHook;
  42.     //Spring容器
  43.     private ApplicationContext applicationContext;
  44.     ...
  45. }
复制代码
 
3.Spring容器初始化完触发Seata客户端初始化
Spring容器启动和初始化完毕后,会调用InitializingBean的afterPropertiesSet()方法进行回调。
 
GlobalTransactionScanner.afterPropertiesSet()方法会调用initClient()方法,并且会通过CAS操作确保initClient()方法仅执行一次。
 
initClient()方法是全局事务注解扫描器GlobalTransactionScanner的核心方法,它会负责对Seata客户端进行初始化。
 
对于Seata客户端来说,有两个重要的组件:一个是TM(即Transaction Manager)全局事务管理器,另一个是RM(即Resource Manager)分支事务资源管理器。
 
在initClient()方法中,会先调用TMClient的init()方法对TM全局事务管理器客户端进行初始化,然后调用RMClient的init()方法对RM分支事务资源管理器客户端进行初始化。
1.png
  1. //AbstractAutoProxyCreator:Spring的动态代理自动创建者
  2. //ConfigurationChangeListener:关注配置变更事件的监听器
  3. //InitializingBean:Spring Bean初始化回调
  4. //ApplicationContextAware:让Bean可以获取Spring容器
  5. //DisposableBean:支持可抛弃Bean
  6. public class GlobalTransactionScanner extends AbstractAutoProxyCreator
  7.         implements ConfigurationChangeListener, InitializingBean, ApplicationContextAware, DisposableBean {
  8.     ...
  9.     //确保初始化方法仅仅调用一次的CAS变量
  10.     //通过Atomic CAS操作可以确保多线程并发下,方法只被调用一次
  11.     //只有一个线程可以成功对initialized原子变量进行CAS操作
  12.     private final AtomicBoolean initialized = new AtomicBoolean(false);
  13.     ...
  14.     public GlobalTransactionScanner(String applicationId, String txServiceGroup, int mode, FailureHandler failureHandlerHook) {
  15.         setOrder(ORDER_NUM);
  16.         //启用对目标class创建动态代理
  17.         setProxyTargetClass(true);
  18.         //设置应用程序ID
  19.         this.applicationId = applicationId;
  20.         //设置分布式事务服务分组
  21.         this.txServiceGroup = txServiceGroup;
  22.         //设置分布式事务模式,默认是AT
  23.         this.mode = mode;
  24.         //设置全局事务失败回调钩子
  25.         this.failureHandlerHook = failureHandlerHook;
  26.     }
  27.    
  28.     //DisposableBean接口的回调方法
  29.     //当Spring容器被销毁、系统停止时,所做的一些资源销毁和释放
  30.     @Override
  31.     public void destroy() {
  32.         ShutdownHook.getInstance().destroyAll();
  33.     }
  34.     //InitializingBean接口的回调方法
  35.     //Spring容器启动和初始化完毕后,会调用如下的afterPropertiesSet()方法进行回调
  36.     @Override
  37.     public void afterPropertiesSet() {
  38.         //是否禁用了全局事务,默认是false
  39.         if (disableGlobalTransaction) {
  40.             if (LOGGER.isInfoEnabled()) {
  41.                 LOGGER.info("Global transaction is disabled.");
  42.             }
  43.             ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, (ConfigurationChangeListener)this);
  44.             return;
  45.         }
  46.         //通过CAS操作确保initClient()初始化动作仅仅执行一次
  47.         if (initialized.compareAndSet(false, true)) {
  48.             //initClient()方法会对Seata Client进行初始化,比如和Seata Server建立长连接
  49.             //seata-samples的业务服务、订单服务、库存服务、账号服务的spring.xml配置文件里都配置了GlobalTransactionScanner这个Bean
  50.             //而GlobalTransactionScanner这个Bean伴随着Spring容器的初始化完毕,都会回调其初始化逻辑initClient()
  51.             initClient();
  52.         }
  53.     }
  54.    
  55.     //initClient()是核心方法,负责对Seata Client客户端进行初始化
  56.     private void initClient() {
  57.         if (LOGGER.isInfoEnabled()) {
  58.             LOGGER.info("Initializing Global Transaction Clients ... ");
  59.         }
  60.       
  61.         //对于Seata Client来说,最重要的组件有两个:
  62.         //一个是TM,即Transaction Manager,全局事务管理器
  63.         //一个是RM,即Resource Manager,分支事务资源管理器
  64.       
  65.         //init TM
  66.         //TMClient.init()会对TM全局事务管理器客户端进行初始化
  67.         TMClient.init(applicationId, txServiceGroup, accessKey, secretKey);
  68.         if (LOGGER.isInfoEnabled()) {
  69.             LOGGER.info("Transaction Manager Client is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup);
  70.         }
  71.       
  72.         //init RM
  73.         //RMClient.init()会对RM分支事务资源管理器客户端进行初始化
  74.         RMClient.init(applicationId, txServiceGroup);
  75.         if (LOGGER.isInfoEnabled()) {
  76.             LOGGER.info("Resource Manager is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup);
  77.         }
  78.         if (LOGGER.isInfoEnabled()) {
  79.             LOGGER.info("Global Transaction Clients are initialized. ");
  80.         }
  81.       
  82.         //注册Spring容器被销毁时的回调钩子,释放TM和RM两个组件的一些资源
  83.         registerSpringShutdownHook();
  84.     }
  85.    
  86.     private void registerSpringShutdownHook() {
  87.         if (applicationContext instanceof ConfigurableApplicationContext) {
  88.             ((ConfigurableApplicationContext) applicationContext).registerShutdownHook();
  89.             ShutdownHook.removeRuntimeShutdownHook();
  90.         }
  91.         ShutdownHook.getInstance().addDisposable(TmNettyRemotingClient.getInstance(applicationId, txServiceGroup));
  92.         ShutdownHook.getInstance().addDisposable(RmNettyRemotingClient.getInstance(applicationId, txServiceGroup));
  93.     }
  94.     ...
  95. }
复制代码
 
4.TM全局事务管理器客户端初始化的源码
TM全局事务管理器在进行初始化之前,会先通过TmNettyRemotingClient的getInstance()方法获取TM组件的Netty网络通信客户端实例,该方法使用了Double Check双重检查机制。
 
对TM组件的Netty网络通信客户端实例TmNettyRemotingClient进行实例化时,会传入一个创建好的Netty网络通信客户端配置实例NettyClientConfig,以及一个创建好的线程池messageExecutor。
  1. public class TMClient {
  2.     public static void init(String applicationId, String transactionServiceGroup) {
  3.         init(applicationId, transactionServiceGroup, null, null);
  4.     }
  5.    
  6.     public static void init(String applicationId, String transactionServiceGroup, String accessKey, String secretKey) {
  7.         //获取TM组件的Netty网络通信客户端实例
  8.         TmNettyRemotingClient tmNettyRemotingClient = TmNettyRemotingClient.getInstance(applicationId, transactionServiceGroup, accessKey, secretKey);
  9.         tmNettyRemotingClient.init();
  10.     }
  11. }
  12. public final class TmNettyRemotingClient extends AbstractNettyRemotingClient {
  13.     ...
  14.     public static TmNettyRemotingClient getInstance() {
  15.         //Java并发编程里经典的Double Check
  16.         if (instance == null) {
  17.             synchronized (TmNettyRemotingClient.class) {
  18.                 if (instance == null) {
  19.                     //创建一个NettyClientConfig,作为Netty网络通信客户端的配置
  20.                     NettyClientConfig nettyClientConfig = new NettyClientConfig();
  21.                     //创建一个线程池
  22.                     final ThreadPoolExecutor messageExecutor = new ThreadPoolExecutor(
  23.                         nettyClientConfig.getClientWorkerThreads(), nettyClientConfig.getClientWorkerThreads(),
  24.                         KEEP_ALIVE_TIME, TimeUnit.SECONDS,
  25.                         new LinkedBlockingQueue<>(MAX_QUEUE_SIZE),
  26.                         new NamedThreadFactory(nettyClientConfig.getTmDispatchThreadPrefix(), nettyClientConfig.getClientWorkerThreads()),
  27.                         RejectedPolicies.runsOldestTaskPolicy()
  28.                     );
  29.                     instance = new TmNettyRemotingClient(nettyClientConfig, null, messageExecutor);
  30.                 }
  31.             }
  32.         }
  33.         return instance;
  34.     }
  35.    
  36.     private TmNettyRemotingClient(NettyClientConfig nettyClientConfig, EventExecutorGroup eventExecutorGroup, ThreadPoolExecutor messageExecutor) {
  37.         super(nettyClientConfig, eventExecutorGroup, messageExecutor, NettyPoolKey.TransactionRole.TMROLE);
  38.         //安全认证signer数字签名组件
  39.         //EnhancedServiceLoader对一个接口进行加载,类似于Seata SPI机制
  40.         this.signer = EnhancedServiceLoader.load(AuthSigner.class);
  41.     }
  42.     ...
  43. }
  44. public class NettyClientConfig extends NettyBaseConfig {
  45.     private int connectTimeoutMillis = 10000;//连接超时时间
  46.     private int clientSocketSndBufSize = 153600;//客户端Socket发送Buffer大小
  47.     private int clientSocketRcvBufSize = 153600;//客户端Socket接收Buffer大小
  48.     private int clientWorkerThreads = WORKER_THREAD_SIZE;//客户端工作线程
  49.     private final Class<? extends Channel> clientChannelClazz = CLIENT_CHANNEL_CLAZZ;//客户端Channel类
  50.     private int perHostMaxConn = 2;//每个host的最大连接数
  51.     private static final int PER_HOST_MIN_CONN = 2;
  52.     private int pendingConnSize = Integer.MAX_VALUE;
  53.     private static final long RPC_RM_REQUEST_TIMEOUT = CONFIG.getLong(ConfigurationKeys.RPC_RM_REQUEST_TIMEOUT, DEFAULT_RPC_RM_REQUEST_TIMEOUT);
  54.     private static final long RPC_TM_REQUEST_TIMEOUT = CONFIG.getLong(ConfigurationKeys.RPC_TM_REQUEST_TIMEOUT, DEFAULT_RPC_TM_REQUEST_TIMEOUT);
  55.     private static String vgroup;
  56.     private static String clientAppName;
  57.     private static int clientType;
  58.     private static int maxInactiveChannelCheck = 10;
  59.     private static final int MAX_NOT_WRITEABLE_RETRY = 2000;
  60.     private static final int MAX_CHECK_ALIVE_RETRY = 300;
  61.     private static final int CHECK_ALIVE_INTERVAL = 10;
  62.     private static final String SOCKET_ADDRESS_START_CHAR = "/";
  63.     private static final long MAX_ACQUIRE_CONN_MILLS = 60 * 1000L;
  64.     private static final String RPC_DISPATCH_THREAD_PREFIX = "rpcDispatch";
  65.     private static final int DEFAULT_MAX_POOL_ACTIVE = 1;
  66.     private static final int DEFAULT_MIN_POOL_IDLE = 0;
  67.     private static final boolean DEFAULT_POOL_TEST_BORROW = true;
  68.     private static final boolean DEFAULT_POOL_TEST_RETURN = true;
  69.     private static final boolean DEFAULT_POOL_LIFO = true;
  70.     private static final boolean ENABLE_CLIENT_BATCH_SEND_REQUEST = CONFIG.getBoolean(ConfigurationKeys.ENABLE_CLIENT_BATCH_SEND_REQUEST, DEFAULT_ENABLE_CLIENT_BATCH_SEND_REQUEST);
  71.     ...
  72. }
  73. public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting implements RemotingClient {
  74.     ...
  75.     private final NettyClientBootstrap clientBootstrap;
  76.     private NettyClientChannelManager clientChannelManager;
  77.     ...
  78.     public AbstractNettyRemotingClient(NettyClientConfig nettyClientConfig, EventExecutorGroup eventExecutorGroup, ThreadPoolExecutor messageExecutor, NettyPoolKey.TransactionRole transactionRole) {
  79.         super(messageExecutor);
  80.         this.transactionRole = transactionRole;
  81.         clientBootstrap = new NettyClientBootstrap(nettyClientConfig, eventExecutorGroup, transactionRole);
  82.         clientBootstrap.setChannelHandlers(new ClientHandler());
  83.         clientChannelManager = new NettyClientChannelManager(new NettyPoolableFactory(this, clientBootstrap), getPoolKeyFunction(), nettyClientConfig);
  84.     }
  85.     ...
  86. }
复制代码
 
15.全局事务注解扫描器的初始化总结
全局事务注解扫描器GlobalTransactionScanner的初始化主要做了如下三项工作:
一.初始化TM全局事务管理器客户端
二.初始化RM分支事务资源管理器客户端
三.对添加了Seata相关注解的Bean创建全局事务动态代理
 

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

相关推荐

您需要登录后才可以回帖 登录 | 立即注册