三、TransactionalTemplate处理全局事务
所有文章
https://www.cnblogs.com/lay2017/p/12485081.html
正文
上一篇文章中,我们看了一下GlobalTransactionalInterceptor这个拦截器,知道了@GlobalTransactional注解的主体逻辑被委托给了TransactionalTemplate来实现。
本文,将看一下TransationalTemplate这个模板方法处理全局事务的主体逻辑。
execute方法
首先,我们跟进TransactionalTemplate的execute方法
public Object execute(TransactionalExecutor business) throws Throwable { // 1. 获取或者创建一个全局事务 GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate(); // 1.1 获取事务信息 TransactionInfo txInfo = business.getTransactionInfo(); if (txInfo == null) { throw new ShouldNeverHappenException("transactionInfo does not exist"); } try { // 2. 开始全局事务 beginTransaction(txInfo, tx); Object rs = null; try { // 执行业务逻辑 rs = business.execute(); } catch (Throwable ex) { // 3.rollback全局事务 completeTransactionAfterThrowing(txInfo,tx,ex); throw ex; } // 4. commit全局事务 commitTransaction(tx); return rs; } finally { //5. 清理 triggerAfterCompletion(); cleanUp(); } }
execute方法的逻辑我们应该非常的熟悉,这和JDBC的API非常的相似。同样是经历:begin -> commit || rollback,这样一个逻辑。
步骤主要分为如下几个:
1)获取或者创建一个全局事务;
2)begin全局事务;
3)异常rollback事务;
4)正常commit事务;
下面,我们将逐步阅读对应步骤的代码
getCurrentOrCreate
GlobalTransactionContext作为一个上下文,提供了获取或者创建全局事务的方法,跟进它的getCurrentOrCreate方法
public static GlobalTransaction getCurrentOrCreate() { GlobalTransaction tx = getCurrent(); // 如果获取不到现有的全局事务,那么创建一个 if (tx == null) { return createNew(); } return tx; }
我们,先看看它是如何获取到已经存在的全局事务的,跟进getCurrent方法
private static GlobalTransaction getCurrent() { // 获取全局事务的XID String xid = RootContext.getXID(); // XID不存在,表示不存在全局事务 if (xid == null) { return null; } // 否则以参与者的身份加入全局事务中 return new DefaultGlobalTransaction(xid, GlobalStatus.Begin, GlobalTransactionRole.Participant); }
显然,判断是否存在于全局事务中是根据传递而来的XID是否存在而决定的。我们跟进getXID看看从哪里获取XID
private static ContextCore CONTEXT_HOLDER = ContextCoreLoader.load(); public static String getXID() { String xid = CONTEXT_HOLDER.get(KEY_XID); if (StringUtils.isNotBlank(xid)) { return xid; } String xidType = CONTEXT_HOLDER.get(KEY_XID_INTERCEPTOR_TYPE); if (StringUtils.isNotBlank(xidType) && xidType.indexOf("_") > -1) { return xidType.split("_")[0]; } return null; }
可以看到,CONTEXT_HOLDER是一个静态对象,通过load方法加载的。跟进load,我们看一下加载逻辑
private static class ContextCoreHolder { private static ContextCore instance; static { // SPI机制加载自定义配置 ContextCore contextCore = EnhancedServiceLoader.load(ContextCore.class); if (contextCore == null) { // 默认采用ThreadLocal实现 contextCore = new ThreadLocalContextCore(); } instance = contextCore; } } public static ContextCore load() { return ContextCoreHolder.instance; }
seata没有直接采用JAVA提供的SPI机制,而是自己写了一套。通过load方法加载自定义实现,不过默认是选择ThreadLocal来实现的。也就是说,在同一个线程中可以直接获取XID,不同线程中通过set到threadLocal中实现传递。
总得来说,只要有XID,那么就表示已经存在全局事务。
我们再回到getCurrentOrCreate方法中
public static GlobalTransaction getCurrentOrCreate() { GlobalTransaction tx = getCurrent(); // 如果获取不到现有的全局事务,那么创建一个 if (tx == null) { return createNew(); } return tx; }
如果XID不存在,那么tx就会为null,这时候将会创建初始的GlobalTransaction。跟进createNew方法看看创建过程
private static GlobalTransaction createNew() { GlobalTransaction tx = new DefaultGlobalTransaction(); return tx; }
创建过程非常简单,直接new了一个默认实现类。那么,DefaultGlobalTransaction的构造方法有没有做什么?
DefaultGlobalTransaction() { // status是unknow表示初始状态,role是launcher表示发起者 this(null, GlobalStatus.UnKnown, GlobalTransactionRole.Launcher); }
可以看到,和加入一个事务不同,创建一个新的事务状态是unknow而不是begin,角色是创建人而不是参与者。
到这里我们应该明白,分布式事务中XID的传递显得非常重要,这样你才能把不同机器中的本地事务关联到全局事务当中,而如果存在XID丢失的问题就比较可怕了。
beginTransaction
经过getCurrentOrCreate以后,我们已经获得了一个新的或者既有的全局事务。在业务逻辑执行之前,我们需要先做一次begin。跟进beginTransaction
private void beginTransaction(TransactionInfo txInfo, GlobalTransaction tx) throws TransactionalExecutor.ExecutionException { try { triggerBeforeBegin(); // 调用的是全局事务的begin方法 tx.begin(txInfo.getTimeOut(), txInfo.getName()); triggerAfterBegin(); } catch (TransactionException txe) { throw new TransactionalExecutor.ExecutionException(tx, txe, TransactionalExecutor.Code.BeginFailure); } }
继续跟进DefaultGlobalTransaction的begin方法
public void begin(int timeout, String name) throws TransactionException { // 参与者不做后续处理 if (role != GlobalTransactionRole.Launcher) { return; } // 省略 // 调用TransactionManager,begin全局事务 xid = transactionManager.begin(null, null, name, timeout); // status从UNKNOW变成BEGIN status = GlobalStatus.Begin; // 绑定XID到threadLocal RootContext.bind(xid); }
可以看到,参与者这里被return了,所以全局事务只会begin一次。调用TransactionManager以后会生成一个XID标识这个全局事务,并绑定到context当中,状态变成BEGIN。
我们跟进TransactionManager的begin方法,看看做了啥
@Override public String begin(String applicationId, String transactionServiceGroup, String name, int timeout) throws TransactionException { // 构建一个begin请求 GlobalBeginRequest request = new GlobalBeginRequest(); request.setTransactionName(name); request.setTimeout(timeout); // 同步请求seata的服务端 GlobalBeginResponse response = (GlobalBeginResponse)syncCall(request); if (response.getResultCode() == ResultCode.Failed) { throw new TmTransactionException(TransactionExceptionCode.BeginFailed, response.getMsg()); } // 获取服务端的XID return response.getXid(); }
TransactionManager将发起一个请求到服务端,由服务端来生成一个XID。可见,开启一个全局事务会在服务端生成一份全局事务的信息。这时候全局事务处于begin状态了
commitTransaction
业务逻辑处理完毕,将由commitTransaction来提交全局事务。跟进commitTransaction方法看看
private void commitTransaction(GlobalTransaction tx) throws TransactionalExecutor.ExecutionException { try { triggerBeforeCommit(); // 提交全局事务 tx.commit(); triggerAfterCommit(); } catch (TransactionException txe) { throw new TransactionalExecutor.ExecutionException(tx, txe, TransactionalExecutor.Code.CommitFailure); } }
和begin一样,将调用GlobalTransaction的commit方法,跟进commit方法
@Override public void commit() throws TransactionException { if (role == GlobalTransactionRole.Participant) { return; } // 省略 int retry = COMMIT_RETRY_COUNT; try { while (retry > 0) { try { // 提交该XID的全局事务 status = transactionManager.commit(xid); break; } catch (Throwable ex) { retry--; if (retry == 0) { throw new TransactionException("Failed to report global commit", ex); } } } } finally { if (RootContext.getXID() != null) { // 解绑XID if (xid.equals(RootContext.getXID())) { RootContext.unbind(); } } } }
GlobalTransaction也是调用了TransactionManager的commit方法,finally块将解绑XID,可见如果到finally部分基本上算是结束了。而while部分做了一些重试操作。
再看看TransactionManager的commit方法
@Override public GlobalStatus commit(String xid) throws TransactionException { // 全局事务提交的请求 GlobalCommitRequest globalCommit = new GlobalCommitRequest(); globalCommit.setXid(xid); // 同步提交全局事务提交请求 GlobalCommitResponse response = (GlobalCommitResponse)syncCall(globalCommit); return response.getGlobalStatus(); }
请求Server端,响应回来的status将作为本地全局事务的status。如果一切顺利,到GlobalTransaction的commit这里就已经结束了。
completeTransactionAfterThrowing
标准的事务逻辑除了commit还存在rollback的逻辑,当业务逻辑执行异常的时候要进行rollback回滚操作。
我们跟进completeTransactionAfterThrowing方法看看
private void completeTransactionAfterThrowing(TransactionInfo txInfo, GlobalTransaction tx, Throwable ex) throws TransactionalExecutor.ExecutionException { // 判断是否回滚 if (txInfo != null && txInfo.rollbackOn(ex)) { try { // 回滚事务 rollbackTransaction(tx, ex); } catch (TransactionException txe) { throw new TransactionalExecutor.ExecutionException(tx, txe, TransactionalExecutor.Code.RollbackFailure, ex); } } else { // 如果不需要回滚,直接commit commitTransaction(tx); } }
可以看到,并不是所有异常都需要回滚的,txInfo包含的回滚规则会判断该异常是否回滚,如果不需要回滚直接commit全局事务。
跟进rollbackTransaction看看回滚操作
private void rollbackTransaction(GlobalTransaction tx, Throwable ex) throws TransactionException, TransactionalExecutor.ExecutionException { triggerBeforeRollback(); // 调用GlobalTransaction的rollback操作 tx.rollback(); triggerAfterRollback(); // 抛出一个回滚完成的执行异常 throw new TransactionalExecutor.ExecutionException(tx, TransactionalExecutor.Code.RollbackDone, ex); }
首先,和begin、commit一样,rollback也是调用了GlobalTransaction的方法。但是要注意,这里rollback完成以后会抛出一个RollbackDone的ExecutionException异常。
ExecutionException将会被GlobalTransactionalInterceptor的handleGlobalTransaction方法给捕捉到,我们看看捕捉的位置
try { // 省略 } catch (TransactionalExecutor.ExecutionException e) { TransactionalExecutor.Code code = e.getCode(); switch (code) { case RollbackDone: throw e.getOriginalException(); case BeginFailure: failureHandler.onBeginFailure(e.getTransaction(), e.getCause()); throw e.getCause(); case CommitFailure: failureHandler.onCommitFailure(e.getTransaction(), e.getCause()); throw e.getCause(); case RollbackFailure: failureHandler.onRollbackFailure(e.getTransaction(), e.getCause()); throw e.getCause(); default: throw new ShouldNeverHappenException("Unknown TransactionalExecutor.Code: " + code); } }
可以看到,匹配到done以后,就会把原始的异常抛出。除了done以外,还有其它对应的状态处理。
回到rollbackTransaction方法
private void rollbackTransaction(GlobalTransaction tx, Throwable ex) throws TransactionException, TransactionalExecutor.ExecutionException { triggerBeforeRollback(); // 调用GlobalTransaction的rollback操作 tx.rollback(); triggerAfterRollback(); // 抛出一个回滚完成的执行异常 throw new TransactionalExecutor.ExecutionException(tx, TransactionalExecutor.Code.RollbackDone, ex); }
跟进rollback
@Override public void rollback() throws TransactionException { if (role == GlobalTransactionRole.Participant) { return; } int retry = ROLLBACK_RETRY_COUNT; try { // 重试 while (retry > 0) { try { // 向Server端发起rollback操作 status = transactionManager.rollback(xid); break; } catch (Throwable ex) { retry--; if (retry == 0) { throw new TransactionException("Failed to report global rollback", ex); } } } } finally { if (RootContext.getXID() != null) { // 解绑XID if (xid.equals(RootContext.getXID())) { RootContext.unbind(); } } } }
可以看到,和commit的逻辑几乎一致。向Server端发起全局事务的rollback操作,finally块最终解绑XID来结束这个全局事务。
总结
到这里,TransactionalTemplate的主体逻辑就结束了。其实逻辑并不复杂,主要就是遵循了begin->commit || rollback这个规范。与Server端的交互交给了TransactionManager来负责。
不过,靠这样一个简单的逻辑并不能够实现分布式事务,还有很多保证事务一致性等机制需要后续去了解清楚的。
本文就先到这里~