前言

Seata 是一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。Seata 将为用户提供了 AT、TCC、SAGA 和 XA 事务模式,为用户打造一站式的分布式解决方案。

对于Seata不太了解的朋友,可以看下我之前写的文章:

TCC模式

TCC模式怎么理解

TCC(Try-Confirm-Cancel)实际上是服务化的两阶段提交协议,业务开发者需要实现这三个服务接口,第一阶段服务由业务代码编排来调用 Try 接口进行资源预留,所有参与者的 Try 接口都成功了,事务管理器会提交事务,并调用每个参与者的 Confirm 接口真正提交业务操作,否则调用每个参与者的Cancel接口回滚事务。

Seata 框架把每组 TCC 接口当做一个 Resource,称为 TCC Resource。这套 TCC 接口可以是 RPC,也以是服务内 JVM 调用。在业务启动时,Seata 框架会自动扫描识别到 TCC 接口的调用方和发布方。如果是 RPC 的话,就是 sofa:reference、sofa:service、dubbo:reference、dubbo:service 等。

扫描到 TCC 接口的调用方和发布方之后。如果是发布方,会在业务启动时向 TC 注册 TCC Resource,与 DataSource Resource 一样,每个资源也会带有一个资源 ID。

如果是调用方,Seata 框架会给调用方加上切面,与 AT 模式一样,在运行时,该切面会拦截所有对 TCC 接口的调用。每调用一次 Try 接口,切面会先向 TC 注册一个分支事务,然后才去执行原来的 RPC 调用。当请求链路调用完成后,TC 通过分支事务的资源 ID 回调到正确的参与者去执行对应 TCC 资源的 Confirm 或 Cancel 方法。

如何设计和异常控制

TCC 模式需要用户根据自己的业务场景实现 Try、Confirm 和 Cancel 三个操作;事务发起方在一阶段执行 Try 方式,在二阶段提交执行 Confirm 方法,二阶段回滚执行 Cancel 方法。

TCC 三个方法描述:

  • Try:资源的检测和预留;
  • Confirm:执行的业务操作提交;要求 Try 成功 Confirm 一定要能成功;
  • Cancel:预留资源释放;

业务模型分2阶段设计

用户接入 TCC ,最重要的是考虑如何将自己的业务模型拆成两阶段来实现。

以“扣钱”场景为例,在接入 TCC 前,对 A 账户的扣钱,只需一条更新账户余额的 SQL 便能完成;但是在接入 TCC 之后,用户就需要考虑如何将原来一步就能完成的扣钱操作,拆成两阶段,实现成三个方法,并且保证一阶段 Try 成功的话 二阶段 Confirm 一定能成功。

如上图所示,

Try 方法作为一阶段准备方法,需要做资源的检查和预留。在扣钱场景下,Try 要做的事情是就是检查账户余额是否充足,预留转账资金,预留的方式就是冻结 A 账户的 转账资金。Try 方法执行之后,账号 A 余额虽然还是 100,但是其中 30 元已经被冻结了,不能被其他事务使用。

二阶段 Confirm 方法执行真正的扣钱操作。Confirm 会使用 Try 阶段冻结的资金,执行账号扣款。Confirm 方法执行之后,账号 A 在一阶段中冻结的 30 元已经被扣除,账号 A 余额变成 70 元 。

如果二阶段是回滚的话,就需要在 Cancel 方法内释放一阶段 Try 冻结的 30 元,使账号 A 的回到初始状态,100 元全部可用。

用户接入 TCC 模式,最重要的事情就是考虑如何将业务模型拆成 2 阶段,实现成 TCC 的 3 个方法,并且保证 Try 成功 Confirm 一定能成功。相对于 AT 模式,TCC 模式对业务代码有一定的侵入性,但是 TCC 模式无 AT 模式的全局行锁,TCC 性能会比 AT 模式高很多。

TCC设计 – 允许空回滚

首先是空回滚。什么是空回滚?空回滚就是对于一个分布式事务,在没有调用 TCC 资源 Try 方法的情况下,调用了二阶段的 Cancel 方法,Cancel 方法需要识别出这是一个空回滚,然后直接返回成

什么样的情形会造成空回滚呢?可以看图中的第 2 步,前面讲过,注册分支事务是在调用 RPC 时,Seata 框架的切面会拦截到该次调用请求,先向 TC 注册一个分支事务,然后才去执行 RPC 调用逻辑。如果 RPC 调用逻辑有问题,比如调用方机器宕机、网络异常,都会造成 RPC 调用失败,即未执行 Try 方法。但是分布式事务已经开启了,需要推进到终态,因此,TC 会回调参与者二阶段 Cancel 接口,从而形成空回滚。

那会不会有空提交呢?理论上来说不会的,如果调用方宕机,那分布式事务默认是回滚的。如果是网络异常,那 RPC 调用失败,发起方应该通知 TC 回滚分布式事务,这里可以看出为什么是理论上的,就是说发起方可以在 RPC 调用失败的情况下依然通知 TC 提交,这时就会发生空提交,这种情况要么是编码问题,要么开发同学明确知道需要这样做。

那怎么解决空回滚呢?前面提到,Cancel 要识别出空回滚,直接返回成功。那关键就是要识别出这个空回滚。思路很简单就是需要知道一阶段是否执行,如果执行了,那就是正常回滚;如果没执行,那就是空回滚。因此,需要一张额外的事务控制表,其中有分布式事务 ID 和分支事务 ID,第一阶段 Try 方法里会插入一条记录,表示一阶段执行了。Cancel 接口里读取该记录,如果该记录存在,则正常回滚;如果该记录不存在,则是空回滚。

TCC设计 – 幂等控制

接下来是幂等。幂等就是对于同一个分布式事务的同一个分支事务,重复去调用该分支事务的第二阶段接口,因此,要求 TCC 的二阶段 Confirm 和 Cancel 接口保证幂等,不会重复使用或者释放资源。如果幂等控制没有做好,很有可能导致资损等严重问题。

什么样的情形会造成重复提交或回滚?从图中可以看到,提交或回滚是一次 TC 到参与者的网络调用。因此,网络故障、参与者宕机等都有可能造成参与者 TCC 资源实际执行了二阶段防范,但是 TC 没有收到返回结果的情况,这时,TC 就会重复调用,直至调用成功,整个分布式事务结束。

怎么解决重复执行的幂等问题呢?一个简单的思路就是记录每个分支事务的执行状态。在执行前状态,如果已执行,那就不再执行;否则,正常执行。前面在讲空回滚的时候,已经有一张事务控制表了,事务控制表的每条记录关联一个分支事务,那我们完全可以在这张事务控制表上加一个状态字段,用来记录每个分支事务的执行状态。

如图所示,该状态字段有三个值,分别是初始化、已提交、已回滚。Try 方法插入时,是初始化状态。二阶段 Confirm 和 Cancel 方法执行后修改为已提交或已回滚状态。当重复调用二阶段接口时,先获取该事务控制表对应记录,检查状态,如果已执行,则直接返回成功;否则正常执行。

TCC设计 – 防悬挂

最后是防悬挂。按照惯例,咱们来先讲讲什么是悬挂。悬挂就是对于一个分布式事务,其二阶段 Cancel 接口比 Try 接口先执行。因为允许空回滚的原因,Cancel 接口认为 Try 接口没执行,空回滚直接返回成功,对于 Seata 框架来说,认为分布式事务的二阶段接口已经执行成功,整个分布式事务就结束了。但是这之后 Try 方法才真正开始执行,预留业务资源,前面提到事务并发控制的业务加锁,对于一个 Try 方法预留的业务资源,只有该分布式事务才能使用,然而 Seata 框架认为该分布式事务已经结束,也就是说,当出现这种情况时,该分布式事务第一阶段预留的业务资源就再也没有人能够处理了,对于这种情况,我们就称为悬挂,即业务资源预留后没法继续处理。

什么样的情况会造成悬挂呢?按照前面所讲,在 RPC 调用时,先注册分支事务,再执行 RPC 调用,如果此时 RPC 调用的网络发生拥堵,通常 RPC 调用是有超时时间的,RPC 超时以后,发起方就会通知 TC 回滚该分布式事务,可能回滚完成后,RPC 请求才到达参与者,真正执行,从而造成悬挂。

怎么实现才能做到防悬挂呢?根据悬挂出现的条件先来分析下,悬挂是指二阶段 Cancel 执行完后,一阶段才执行。也就是说,为了避免悬挂,如果二阶段执行完成,那一阶段就不能再继续执行。因此,当一阶段执行时,需要先检查二阶段是否已经执行完成,如果已经执行,则一阶段不再执行;否则可以正常执行。那怎么检查二阶段是否已经执行呢?大家是否想到了刚才解决空回滚和幂等时用到的事务控制表,可以在二阶段执行时插入一条事务控制记录,状态为已回滚,这样当一阶段执行时,先读取该记录,如果记录存在,就认为二阶段已经执行;否则二阶段没执行。

Dubbo + Seata 实战案例

关于环境准备和目录结构,大家详见: 微服务痛点 – 基于Dubbo + Seata的分布式事务(AT)模式

业务模型两阶段改造

Storage商品库存

  1. 数据库添加冻结商品库存数
# 创建商品库存表
create table if not exists storage.tcc_storage
(
    id bigint auto_increment
        primary key,
    commodity_code varchar(50) null comment '商品编码',
    name varchar(255) null comment '商品名称',
    count int null comment '商品库存数',
    frozen_count int default 0 null comment '冻结商品库存数'
);
  1. 将原来的扣减商品库存一步逻辑修改成两阶段逻辑操作:
package cn.mushuwei.storage.api;

import cn.mushuwei.storage.api.dto.CommodityDTO;
import io.seata.rm.tcc.api.BusinessActionContext;
import io.seata.rm.tcc.api.BusinessActionContextParameter;
import io.seata.rm.tcc.api.LocalTCC;
import io.seata.rm.tcc.api.TwoPhaseBusinessAction;

/**
 * @author jamesmsw
 * @date 2020/12/1 9:37 上午
 */
@LocalTCC
public interface StorageApi {


    /**
     * 扣减库存准备
     *
     * @param actionContext 业务动作上下文
     * @param commodityDTO 库存信息
     * @return 是/否
     */
    @TwoPhaseBusinessAction(name = "decreaseStorageTcc", commitMethod = "decreaseStorageCommit", rollbackMethod = "decreaseStorageCancel")
    Boolean decreaseStoragePrepare(BusinessActionContext actionContext,
                                   @BusinessActionContextParameter(paramName = "commdityDTO") CommodityDTO commodityDTO);

    /**
     * 扣减库存提交
     *
     * @param actionContext 业务动作上下文
     * @return 是/否
     */
    Boolean decreaseStorageCommit(BusinessActionContext actionContext);

    /**
     * 扣减库存回滚
     *
     * @param actionContext 业务动作上下文
     * @return 是/否
     */
    Boolean decreaseStorageCancel(BusinessActionContext actionContext);
}

  1. 数据持久化操作逻辑修改:
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="cn.mushuwei.storage.dao.StorageDao">

    <update id="tccDecreaseStoragePrepare">
        update tcc_storage set count = count - #{count},
                           frozen_count = frozen_count + #{count}
        where commodity_code = #{commodityCode}
    </update>

    <update id="tccDecreaseStorageCommit">
        update tcc_storage set frozen_count = frozen_count - #{count}
        where commodity_code = #{commodityCode}
    </update>

    <update id="tccDecreaseStorageCancel">
        update tcc_storage set count = count + #{count},
                           frozen_count = frozen_count - #{count}
        where commodity_code = #{commodityCode}
    </update>
</mapper>

Account用户

  1. **数据库添加账号冻结余额: **
# 创建用户账户表
create table if not exists tcc_account
(
	id bigint auto_increment
		primary key,
	user_id varchar(50) null comment '用户编号',
	amount double(50,2) null comment '账号余额',
	frozen_amount double(50,2) default 0.00 null comment '账号冻结余额'
);
  1. 将原来的扣减用户余额一步逻辑修改成两阶段逻辑操作:
package cn.mushuwei.account.api;

import cn.mushuwei.account.api.dto.AccountDTO;
import io.seata.rm.tcc.api.BusinessActionContext;
import io.seata.rm.tcc.api.BusinessActionContextParameter;
import io.seata.rm.tcc.api.LocalTCC;
import io.seata.rm.tcc.api.TwoPhaseBusinessAction;

/**
 * @author jamesmsw
 * @date 2020/12/1 5:20 下午
 */
@LocalTCC
public interface AccountApi {


    /**
     * 从账号扣钱准备
     *
     * @param accountDTO
     * @param actionContext 业务动作上下文
     * @return 是/否
     */
    @TwoPhaseBusinessAction(name = "decreaseAccountTcc", commitMethod = "decreaseAccountCommit", rollbackMethod = "decreaseAccountCancel")
    Boolean decreaseAccountPrepare(BusinessActionContext actionContext,
                                   @BusinessActionContextParameter(paramName = "accountDTO") AccountDTO accountDTO);

    /**
     * 从账号扣钱提交
     *
     * @param actionContext
     * @return 是/否
     */
    Boolean decreaseAccountCommit(BusinessActionContext actionContext);


    /**
     * 从账号扣钱取消
     *
     * @param actionContext
     * @return 是/否
     */
    Boolean decreaseAccountCancel(BusinessActionContext actionContext);
}

  1. 数据持久化操作逻辑修改:
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="cn.mushuwei.account.dao.AccountDao">

    <!--    以下是tcc模式所需的数据库操作 -->
    <update id="tccDecreaseAccountPrepare">
        update tcc_account set amount = amount - #{amount},
                           frozen_amount = frozen_amount + #{amount}
        where user_id = #{userId}
    </update>

    <update id="tccDecreaseAccountCommit">
        update tcc_account set frozen_amount = frozen_amount - #{amount}
        where user_id = #{userId}
    </update>

    <update id="tccDecreaseAccountCancel">
        update tcc_account set amount = amount + #{amount},
                           frozen_amount = frozen_amount - #{amount}
        where user_id = #{userId}
    </update>

</mapper>

Order订单

  1. 数据库添加订单创建状态:
create table if not exists `order`.tcc_order
(
    id bigint auto_increment
        primary key,
    order_no varchar(100) null comment '订单号',
    user_id varchar(50) null comment '用户编号',
    code varchar(100) null comment '商品编码',
    count int null comment '商品数量',
    amount double(50,2) null comment '消费总金额',
    status tinyint null comment '状态,1-预创建;2-创建成功;3-创建失败'
);
  1. 将原来的创建订单一步逻辑修改成两阶段逻辑操作:
package cn.mushuwei.order.api;

import cn.mushuwei.order.api.dto.OrderDTO;
import io.seata.rm.tcc.api.BusinessActionContext;
import io.seata.rm.tcc.api.BusinessActionContextParameter;
import io.seata.rm.tcc.api.LocalTCC;
import io.seata.rm.tcc.api.TwoPhaseBusinessAction;

/**
 * @author jamesmsw
 * @date 2020/12/1 5:57 下午
 */
@LocalTCC
public interface OrderApi {


    /**
     * 创建订单准备
     *
     * @param orderDTO
     * @param actionContext 业务动作上下文
     * @return
     */
    @TwoPhaseBusinessAction(name = "createOrderTcc", commitMethod = "createOrderCommit", rollbackMethod = "createOrderCancel")
    Boolean createOrderPrepare(BusinessActionContext actionContext,
                               @BusinessActionContextParameter(paramName = "orderDTO") OrderDTO orderDTO);

    /**
     * 创建订单提交
     *
     * @param actionContext 业务动作上下文
     * @return
     */
    Boolean createOrderCommit(BusinessActionContext actionContext);

    /**
     * 创建订单取消
     *
     * @param actionContext 业务动作上下文
     * @return
     */
    Boolean createOrderCancel(BusinessActionContext actionContext);
}

  • TwoPhaseBusinessAction注解标记这是个TCC接口,同时指定commitMethod,rollbackMethod的名称BusinessActionContext是TCC事务中的上下文对象 BusinessActionContextParameter注解标记的参数会在上下文中传播,即能通过BusinessActionContext对象在commit方法及cancle方法中取到该参数值

  • RM 的接口上面必须要有@LocalTCC 注解,且必须在接口上面

  1. 数据持久化操作逻辑修改:
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="cn.mushuwei.order.dao.OrderDao">

    <insert id="tccCreateOrderPrepare" keyProperty="id" useGeneratedKeys="true"
            parameterType="cn.mushuwei.order.entity.OrderDO">
        insert into `tcc_order` (order_no,
            user_id,
            code,
            count,
            amount,
            status)
        VALUES (#{order.orderNo},
                #{order.userId},
                #{order.code},
                #{order.count},
                #{order.amount},
                #{order.status})
    </insert>

    <update id="tccCreateOrderCommitOrCancel">
        update `tcc_order` set status = #{status}
        where order_no = #{orderNo}
    </update>

</mapper>

演示

启动Dubbo、Seata、MySQ并初始化数据, 使各服务应用注册到Seata上

  • Dubbo、Seata和MySQL服务
mushuwei@mushuweideMacBook-Pro-2 seata % docker ps
CONTAINER ID        IMAGE                  COMMAND                  CREATED             STATUS              PORTS                                                  NAMES
0c9c325a039c        mysql:latest           "docker-entrypoint.s…"   2 weeks ago         Up 7 minutes        0.0.0.0:3306->3306/tcp, 33060/tcp                      mysql5.7
b8031fa865cd        seataio/seata-server   "java -Djava.securit…"   2 weeks ago         Up 20 seconds       0.0.0.0:8091->8091/tcp                                 seata_seata-server_1
2af927368a15        apache/dubbo-admin     "java -XX:+UnlockExp…"   2 weeks ago         Up 2 hours          0.0.0.0:8080->8080/tcp                                 dubbo_admin_1
7afec07234c9        zookeeper              "/docker-entrypoint.…"   2 weeks ago         Up 2 hours          2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp, 8080/tcp   dubbo_zookeeper_1
  • 初始化数据
mysql> use storage;
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A

Database changed
mysql> select * from tcc_storage;
+----+----------------+------+-------+--------------+
| id | commodity_code | name | count | frozen_count |
+----+----------------+------+-------+--------------+
|  1 | cola           | ???? |  2000 |            0 |
+----+----------------+------+-------+--------------+
1 row in set (0.00 sec)

mysql> use account;
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A

Database changed
mysql> select * from tcc_account;
+----+---------+---------+---------------+
| id | user_id | amount  | frozen_amount |
+----+---------+---------+---------------+
|  1 | user123 | 1250.00 |          0.00 |
+----+---------+---------+---------------+
1 row in set (0.00 sec)

mysql> use order;
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A

Database changed
mysql> select * from tcc_order;
Empty set (0.00 sec)
  • 启动Storage、Account、Order和Business

  • Seata上各应用的注册情况
Starting seata_seata-server_1 ... done
Attaching to seata_seata-server_1
seata-server_1  | [0.001s][warning][gc] -Xloggc is deprecated. Will use -Xlog:gc:/var/log/seata_gc.log instead.
seata-server_1  | [0.015s][info   ][gc] Using G1
seata-server_1  | [0.841s][info   ][gc] GC(0) Pause Young (Normal) (G1 Evacuation Pause) 14M->4M(32M) 11.654ms
seata-server_1  | SLF4J: A number (18) of logging calls during the initialization phase have been intercepted and are
seata-server_1  | SLF4J: now being replayed. These are subject to the filtering rules of the underlying logging system.
seata-server_1  | SLF4J: See also http://www.slf4j.org/codes.html#replay
seata-server_1  | 08:16:30.938  INFO --- [                     main] io.seata.server.Server                   : The server is running in container.
seata-server_1  | 08:16:30.972  INFO --- [                     main] io.seata.config.FileConfiguration        : The file name of the operation is registry
seata-server_1  | 08:16:30.980  INFO --- [                     main] io.seata.config.FileConfiguration        : The configuration file used is /seata-server/resources/registry.conf
seata-server_1  | [1.385s][info   ][gc] GC(1) Pause Young (Normal) (G1 Evacuation Pause) 15M->6M(32M) 14.280ms
seata-server_1  | 08:16:31.221  INFO --- [                     main] io.seata.config.FileConfiguration        : The file name of the operation is file.conf
seata-server_1  | 08:16:31.222  INFO --- [                     main] io.seata.config.FileConfiguration        : The configuration file used is file.conf
seata-server_1  | WARNING: An illegal reflective access operation has occurred
seata-server_1  | WARNING: Illegal reflective access by net.sf.cglib.core.ReflectUtils$2 (file:/seata-server/libs/cglib-3.1.jar) to method java.lang.ClassLoader.defineClass(java.lang.String,byte[],int,int,java.security.ProtectionDomain)
seata-server_1  | WARNING: Please consider reporting this to the maintainers of net.sf.cglib.core.ReflectUtils$2
seata-server_1  | WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
seata-server_1  | WARNING: All illegal access operations will be denied in a future release
seata-server_1  | [1.734s][info   ][gc] GC(2) Pause Young (Normal) (G1 Evacuation Pause) 16M->7M(32M) 6.400ms
seata-server_1  | [2.101s][info   ][gc] GC(3) Pause Young (Normal) (G1 Evacuation Pause) 18M->7M(32M) 4.828ms
seata-server_1  | 08:16:31.924  INFO --- [                     main] i.s.core.rpc.netty.NettyServerBootstrap  : Server started, listen port: 8091
seata-server_1  | 08:26:12.007  INFO --- [rverHandlerThread_1_1_500] i.s.c.r.processor.server.RegRmProcessor  : RM register success,message:RegisterRMRequest{resourceIds='jdbc:mysql://localhost:3306/storage', applicationId='seata-action-storage', transactionServiceGroup='service_tx_group'},channel:[id: 0xae1ea1b1, L:/172.20.0.2:8091 - R:/172.20.0.1:52380],client version:1.3.0
seata-server_1  | 08:26:12.080  INFO --- [rverHandlerThread_1_2_500] i.s.c.r.processor.server.RegRmProcessor  : RM register success,message:RegisterRMRequest{resourceIds='jdbc:mysql://localhost:3306/storage', applicationId='seata-action-storage', transactionServiceGroup='service_tx_group'},channel:[id: 0xae1ea1b1, L:/172.20.0.2:8091 - R:/172.20.0.1:52380],client version:1.3.0
seata-server_1  | 08:26:33.704  INFO --- [rverHandlerThread_1_3_500] i.s.c.r.processor.server.RegRmProcessor  : RM register success,message:RegisterRMRequest{resourceIds='jdbc:mysql://localhost:3306/account', applicationId='seata-action-account', transactionServiceGroup='service_tx_group'},channel:[id: 0xd949a994, L:/172.20.0.2:8091 - R:/172.20.0.1:52396],client version:1.3.0
seata-server_1  | 08:26:33.758  INFO --- [rverHandlerThread_1_4_500] i.s.c.r.processor.server.RegRmProcessor  : RM register success,message:RegisterRMRequest{resourceIds='jdbc:mysql://localhost:3306/account', applicationId='seata-action-account', transactionServiceGroup='service_tx_group'},channel:[id: 0xd949a994, L:/172.20.0.2:8091 - R:/172.20.0.1:52396],client version:1.3.0
seata-server_1  | 08:26:57.466  INFO --- [rverHandlerThread_1_5_500] i.s.c.r.processor.server.RegRmProcessor  : RM register success,message:RegisterRMRequest{resourceIds='jdbc:mysql://localhost:3306/order', applicationId='seata-action-order', transactionServiceGroup='service_tx_group'},channel:[id: 0xfd51f88b, L:/172.20.0.2:8091 - R:/172.20.0.1:52412],client version:1.3.0
seata-server_1  | 08:26:57.518  INFO --- [rverHandlerThread_1_6_500] i.s.c.r.processor.server.RegRmProcessor  : RM register success,message:RegisterRMRequest{resourceIds='jdbc:mysql://localhost:3306/order', applicationId='seata-action-order', transactionServiceGroup='service_tx_group'},channel:[id: 0xfd51f88b, L:/172.20.0.2:8091 - R:/172.20.0.1:52412],client version:1.3.0
seata-server_1  | 08:27:10.600  INFO --- [ettyServerNIOWorker_1_4_8] i.s.c.r.processor.server.RegTmProcessor  : TM register success,message:RegisterTMRequest{applicationId='seata-action-storage', transactionServiceGroup='service_tx_group'},channel:[id: 0x0e0b6c24, L:/172.20.0.2:8091 - R:/172.20.0.1:52424],client version:1.3.0
seata-server_1  | 08:27:32.694  INFO --- [ettyServerNIOWorker_1_5_8] i.s.c.r.processor.server.RegTmProcessor  : TM register success,message:RegisterTMRequest{applicationId='seata-action-account', transactionServiceGroup='service_tx_group'},channel:[id: 0x2fd20474, L:/172.20.0.2:8091 - R:/172.20.0.1:52432],client version:1.3.0
seata-server_1  | 08:27:56.453  INFO --- [ettyServerNIOWorker_1_6_8] i.s.c.r.processor.server.RegTmProcessor  : TM register success,message:RegisterTMRequest{applicationId='seata-action-order', transactionServiceGroup='service_tx_group'},channel:[id: 0xc8f6ba94, L:/172.20.0.2:8091 - R:/172.20.0.1:52436],client version:1.3.0
seata-server_1  | 08:28:15.847  INFO --- [rverHandlerThread_1_7_500] i.s.c.r.processor.server.RegRmProcessor  : RM register success,message:RegisterRMRequest{resourceIds='null', applicationId='seata-action-business', transactionServiceGroup='service_tx_group'},channel:[id: 0x9ef75d68, L:/172.20.0.2:8091 - R:/172.20.0.1:52444],client version:1.3.0
seata-server_1  | 08:28:15.863  INFO --- [ettyServerNIOWorker_1_7_8] i.s.c.r.processor.server.RegTmProcessor  : TM register success,message:RegisterTMRequest{applicationId='seata-action-business', transactionServiceGroup='service_tx_group'},channel:[id: 0x2b6c19d5, L:/172.20.0.2:8091 - R:/172.20.0.1:52440],client version:1.3.0

检查各服务Service在Dubbo上的情况

正常流程-模拟用户下单,看下各应用的二阶段提交日志

  • 执行business模块test/java目录下的business.http文件,对接口发起请求
POST localhost:8084/business/buy
Content-Type: application/json

{
  "userId" : "user123",
  "commodityCode" : "cola",
  "count" : 2,
  "amount" : 5.0
}
  • 各数据库数据变化
mysql> use storage;
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A

Database changed
mysql> select * from storage;
+----+----------------+------+-------+
| id | commodity_code | name | count |
+----+----------------+------+-------+
|  1 | cola           | ???? |  1998 |
+----+----------------+------+-------+
1 row in set (0.00 sec)

mysql> use account;
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A

Database changed
mysql> select * from account;
+----+---------+---------+
| id | user_id | amount  |
+----+---------+---------+
|  1 | user123 | 1245.00 |
+----+---------+---------+
1 row in set (0.00 sec)

mysql> use order;
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A

Database changed
mysql> select * from order;
ERROR 1064 (42000): You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near 'order' at line 1
mysql> select * from `order`;
+----+----------------------------------+---------+------+-------+--------+
| id | order_no                         | user_id | code | count | amount |
+----+----------------------------------+---------+------+-------+--------+
|  5 | dbde6ebfd72b4ad5aeba67d67ade6894 | user123 | cola |     2 |   5.00 |
+----+----------------------------------+---------+------+-------+--------+
1 row in set (0.00 sec)

  • 各应用下二阶段提交情况,下面日志以Storage应用为例
2020-12-28 17:59:40.778  INFO 28287 --- [ctor_RMROLE_1_1] io.seata.rm.AbstractRMHandler            : the rm client received response msg [version=1.5.0-SNAPSHOT,extraData=null,identified=true,resultCode=null,msg=null] from tc server.
2020-12-28 17:59:40.783 DEBUG 28287 --- [:20881-thread-2] c.m.s.d.S.tccDecreaseStoragePrepare      : ==>  Preparing: update tcc_storage set count = count - ?, frozen_count = frozen_count + ? where commodity_code = ?
2020-12-28 17:59:40.822 DEBUG 28287 --- [:20881-thread-2] c.m.s.d.S.tccDecreaseStoragePrepare      : ==> Parameters: 2(Integer), 2(Integer), cola(String)
2020-12-28 17:59:40.830 DEBUG 28287 --- [:20881-thread-2] c.m.s.d.S.tccDecreaseStoragePrepare      : <==    Updates: 1
2020-12-28 17:59:41.662  INFO 28287 --- [h_RMROLE_1_1_16] i.s.c.r.p.c.RmBranchCommitProcessor      : rm client handle branch commit process:xid=172.20.0.2:8091:86882407747166208,branchId=86882409689128960,branchType=TCC,resourceId=decreaseStorageTcc,applicationData={"actionContext":{"action-start-time":1609149580502,"sys::prepare":"decreaseStoragePrepare","sys::rollback":"decreaseStorageCancel","sys::commit":"decreaseStorageCommit","commdityDTO":{"commodityCode":"cola","count":2},"host-name":"172.17.54.171","actionName":"decreaseStorageTcc"}}
2020-12-28 17:59:41.664  INFO 28287 --- [h_RMROLE_1_1_16] io.seata.rm.AbstractRMHandler            : Branch committing: 172.20.0.2:8091:86882407747166208 86882409689128960 decreaseStorageTcc {"actionContext":{"action-start-time":1609149580502,"sys::prepare":"decreaseStoragePrepare","sys::rollback":"decreaseStorageCancel","sys::commit":"decreaseStorageCommit","commdityDTO":{"commodityCode":"cola","count":2},"host-name":"172.17.54.171","actionName":"decreaseStorageTcc"}}
2020-12-28 17:59:41.675 DEBUG 28287 --- [h_RMROLE_1_1_16] c.m.s.d.S.tccDecreaseStorageCommit       : ==>  Preparing: update tcc_storage set frozen_count = frozen_count - ? where commodity_code = ?
2020-12-28 17:59:41.676 DEBUG 28287 --- [h_RMROLE_1_1_16] c.m.s.d.S.tccDecreaseStorageCommit       : ==> Parameters: 2(Integer), cola(String)
2020-12-28 17:59:41.681 DEBUG 28287 --- [h_RMROLE_1_1_16] c.m.s.d.S.tccDecreaseStorageCommit       : <==    Updates: 1
2020-12-28 17:59:41.704  INFO 28287 --- [h_RMROLE_1_1_16] io.seata.rm.AbstractResourceManager      : TCC resource commit result : true, xid: 172.20.0.2:8091:86882407747166208, branchId: 86882409689128960, resourceId: decreaseStorageTcc
2020-12-28 17:59:41.705  INFO 28287 --- [h_RMROLE_1_1_16] io.seata.rm.AbstractRMHandler            : Branch commit result: PhaseTwo_Committed

异常流程-模拟用户下单,看下各应用的二阶段提交日志

  • 修改BusinessServiceImpl类,并重启
 private boolean flag;

    @Override
    @GlobalTransactional(timeoutMills = 300000, name = "seata-demo-business")
    public Boolean handleBusiness(BusinessDTO businessDTO) {
        flag = false;
        log.info("开始全局事务,XID = " + RootContext.getXID());
        CommodityDTO commodityDTO = new CommodityDTO();
        commodityDTO.setCommodityCode(businessDTO.getCommodityCode());
        commodityDTO.setCount(businessDTO.getCount());
        boolean storageResult =  storageApi.decreaseStorage(commodityDTO);

        OrderDTO orderDTO = new OrderDTO();
        orderDTO.setUserId(businessDTO.getUserId());
        orderDTO.setCommodityCode(businessDTO.getCommodityCode());
        orderDTO.setOrderCount(businessDTO.getCount());
        orderDTO.setOrderAmount(businessDTO.getAmount());
        boolean orderResult = orderApi.createOrder(orderDTO);

        //打开注释测试事务发生异常后,全局回滚功能
        if (!flag) {
            throw new RuntimeException("测试抛异常后,分布式事务回滚!");
        }

        if (!storageResult || !orderResult) {
            throw new RuntimeException("失败");
        }
        return true;
    }
  • 执行business模块test/java目录下的business.http文件,对接口发起请求
POST http://localhost:8084/business/buy

HTTP/1.1 500 
Content-Type: application/json
Transfer-Encoding: chunked
Date: Mon, 21 Dec 2020 08:46:24 GMT
Connection: close

{
  "timestamp": "2020-12-21T08:46:24.678+00:00",
  "status": 500,
  "error": "Internal Server Error",
  "message": "",
  "path": "/business/buy"
}
  • 各数据库数据变化

无任何变化

  • 各应用下二阶段提交情况,下面日志以Storage应用为例
2020-12-28 18:04:12.125  WARN 28287 --- [erverWorker-3-1] o.a.d.remoting.transport.AbstractServer  :  [DUBBO] All clients has disconnected from /172.17.54.171:20881. You can graceful shutdown now., dubbo version: 2.7.8, current host: 172.17.54.171
2020-12-28 18:04:12.126  INFO 28287 --- [erverWorker-3-1] o.a.d.r.t.netty4.NettyServerHandler      :  [DUBBO] The connection of /172.17.54.171:53500 -> /172.17.54.171:20881 is disconnected., dubbo version: 2.7.8, current host: 172.17.54.171
2020-12-28 18:05:00.344  INFO 28287 --- [erverWorker-3-2] o.a.d.r.t.netty4.NettyServerHandler      :  [DUBBO] The connection of /172.17.54.171:54267 -> /172.17.54.171:20881 is established., dubbo version: 2.7.8, current host: 172.17.54.171
2020-12-28 18:05:13.544  INFO 28287 --- [:20881-thread-5] c.m.storage.provider.StorageApiImpl      : commodityDTO: CommodityDTO(id=null, commodityCode=cola, name=null, count=2), actionContext: [xid:172.20.0.2:8091:86883805524140032,branch_Id:86883805897433088,action_name:decreaseStorageTcc,action_context:{action-start-time=1609149913535, sys::prepare=decreaseStoragePrepare, sys::rollback=decreaseStorageCancel, sys::commit=decreaseStorageCommit, commdityDTO=CommodityDTO(id=null, commodityCode=cola, name=null, count=2), host-name=172.17.54.171, actionName=decreaseStorageTcc}]
2020-12-28 18:05:13.557 DEBUG 28287 --- [:20881-thread-5] c.m.s.d.S.tccDecreaseStoragePrepare      : ==>  Preparing: update tcc_storage set count = count - ?, frozen_count = frozen_count + ? where commodity_code = ?
2020-12-28 18:05:13.558 DEBUG 28287 --- [:20881-thread-5] c.m.s.d.S.tccDecreaseStoragePrepare      : ==> Parameters: 2(Integer), 2(Integer), cola(String)
2020-12-28 18:05:13.562 DEBUG 28287 --- [:20881-thread-5] c.m.s.d.S.tccDecreaseStoragePrepare      : <==    Updates: 1
2020-12-28 18:05:13.782  INFO 28287 --- [h_RMROLE_1_2_16] i.s.c.r.p.c.RmBranchRollbackProcessor    : rm handle branch rollback process:xid=172.20.0.2:8091:86883805524140032,branchId=86883805897433088,branchType=TCC,resourceId=decreaseStorageTcc,applicationData={"actionContext":{"action-start-time":1609149913535,"sys::prepare":"decreaseStoragePrepare","sys::rollback":"decreaseStorageCancel","sys::commit":"decreaseStorageCommit","commdityDTO":{"commodityCode":"cola","count":2},"host-name":"172.17.54.171","actionName":"decreaseStorageTcc"}}
2020-12-28 18:05:13.784  INFO 28287 --- [h_RMROLE_1_2_16] io.seata.rm.AbstractRMHandler            : Branch Rollbacking: 172.20.0.2:8091:86883805524140032 86883805897433088 decreaseStorageTcc
2020-12-28 18:05:13.789 DEBUG 28287 --- [h_RMROLE_1_2_16] c.m.s.d.S.tccDecreaseStorageCancel       : ==>  Preparing: update tcc_storage set count = count + ?, frozen_count = frozen_count - ? where commodity_code = ?
2020-12-28 18:05:13.789 DEBUG 28287 --- [h_RMROLE_1_2_16] c.m.s.d.S.tccDecreaseStorageCancel       : ==> Parameters: 2(Integer), 2(Integer), cola(String)
2020-12-28 18:05:13.793 DEBUG 28287 --- [h_RMROLE_1_2_16] c.m.s.d.S.tccDecreaseStorageCancel       : <==    Updates: 1
2020-12-28 18:05:13.815  INFO 28287 --- [h_RMROLE_1_2_16] io.seata.rm.AbstractResourceManager      : TCC resource rollback result : true, xid: 172.20.0.2:8091:86883805524140032, branchId: 86883805897433088, resourceId: decreaseStorageTcc
2020-12-28 18:05:13.815  INFO 28287 --- [h_RMROLE_1_2_16] io.seata.rm.AbstractRMHandler            : Branch Rollbacked result: PhaseTwo_Rollbacked

以上代码,我已经上传到GitHub中了,大家详见: https://github.com/sanshengshui/seata-dubbo-action,TCC模式在TCC分支上。

到此,基于Dubbo + Seata的分布式事务已经讲解完毕。

参考文章

版权声明:本文为sanshengshui原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.cnblogs.com/sanshengshui/p/14204867.html