TX-LCN分布式事务之TCC模式
TCC
模式是TX-LCN
分布式事务模式的一种,T-try
-尝试执行业务、C-confirm
-确认执行业务、
C-cancel
-取消执行业务
# TX-LCN分布式事务之TCC模式
# 什么是TCC模式
TCC
模式是TX-LCN
分布式事务模式的一种,T-try
-尝试执行业务、C-confirm
-确认执行业务、
C-cancel
-取消执行业务
# 原理
TCC
事务机制相对于传统事务机制(X/Open XA Two-Phase-Commit
),其特征在于它不依赖资源管理器(RM
)对XA
的支持,
而是通过对(由业务系统提供的)业务逻辑的调度来实现分布式事务。主要由三步操作,Try
: 尝试执行业务、 Confirm
:确认执行业务、 Cancel
: 取消执行业务。
# 模式特点
- 该模式对代码的嵌入性高,要求每个业务需要写三种步骤的操作。
- 该模式对有无本地事务控制都可以支持使用面广。
- 数据一致性控制几乎完全由开发者控制,对业务开发难度要求高。
# 源码解读
首先我们来看几个关键的类TransactionAspect
事务切面类、DTXLogicWeaver
分布式事务调度器、DTXServiceExecutor
分布式事务执行器
# TransactionAspect 作用
- 源码
@Aspect
@Component
public class TransactionAspect implements Ordered {
private static final Logger log = LoggerFactory.getLogger(TransactionAspect.class);
private final TxClientConfig txClientConfig;
private final DTXLogicWeaver dtxLogicWeaver;
public TransactionAspect(TxClientConfig txClientConfig, DTXLogicWeaver dtxLogicWeaver) {
this.txClientConfig = txClientConfig;
this.dtxLogicWeaver = dtxLogicWeaver;
}
@Pointcut("@annotation(com.codingapi.txlcn.tc.annotation.TccTransaction)")
public void tccTransactionPointcut() {
}
@Around("tccTransactionPointcut() && !lcnTransactionPointcut()&& !txcTransactionPointcut() && !txTransactionPointcut()")
public Object runWithTccTransaction(ProceedingJoinPoint point) throws Throwable {
DTXInfo dtxInfo = DTXInfo.getFromCache(point);
TccTransaction tccTransaction = (TccTransaction)dtxInfo.getBusinessMethod().getAnnotation(TccTransaction.class);
dtxInfo.setTransactionType("tcc");
dtxInfo.setTransactionPropagation(tccTransaction.propagation());
DTXLogicWeaver var10000 = this.dtxLogicWeaver;
point.getClass();
return var10000.runTransaction(dtxInfo, point::proceed);
}
public int getOrder() {
return this.txClientConfig.getDtxAspectOrder();
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
由该类的源码,我们能够明白,通过解析
@TccTransaction
注解进行相应的操作。代码会调用到DTXLogicWeaver
类
# DTXLogicWeaver 作用
public Object runTransaction(DTXInfo dtxInfo, BusinessCallback business) throws Throwable {
if (Objects.isNull(DTXLocalContext.cur())) {
DTXLocalContext.getOrNew();
log.debug("<---- TxLcn start ---->");
DTXLocalContext dtxLocalContext = DTXLocalContext.getOrNew();
TxContext txContext;
if (this.globalContext.hasTxContext()) {
txContext = this.globalContext.txContext();
dtxLocalContext.setInGroup(true);
log.debug("Unit[{}] used parent's TxContext[{}].", dtxInfo.getUnitId(), txContext.getGroupId());
} else {
txContext = this.globalContext.startTx();
}
if (Objects.nonNull(dtxLocalContext.getGroupId())) {
dtxLocalContext.setDestroy(false);
}
dtxLocalContext.setUnitId(dtxInfo.getUnitId());
dtxLocalContext.setGroupId(txContext.getGroupId());
dtxLocalContext.setTransactionType(dtxInfo.getTransactionType());
TxTransactionInfo info = new TxTransactionInfo();
info.setBusinessCallback(business);
info.setGroupId(txContext.getGroupId());
info.setUnitId(dtxInfo.getUnitId());
info.setPointMethod(dtxInfo.getBusinessMethod());
info.setPropagation(dtxInfo.getTransactionPropagation());
info.setTransactionInfo(dtxInfo.getTransactionInfo());
info.setTransactionType(dtxInfo.getTransactionType());
info.setTransactionStart(txContext.isDtxStart());
boolean var15 = false;
Object var6;
try {
var15 = true;
var6 = this.transactionServiceExecutor.transactionRunning(info);
var15 = false;
} finally {
if (var15) {
if (dtxLocalContext.isDestroy()) {
synchronized(txContext.getLock()) {
txContext.getLock().notifyAll();
}
if (!dtxLocalContext.isInGroup()) {
this.globalContext.destroyTx();
}
DTXLocalContext.makeNeverAppeared();
TracingContext.tracing().destroy();
}
log.debug("<---- TxLcn end ---->");
}
}
if (dtxLocalContext.isDestroy()) {
synchronized(txContext.getLock()) {
txContext.getLock().notifyAll();
}
if (!dtxLocalContext.isInGroup()) {
this.globalContext.destroyTx();
}
DTXLocalContext.makeNeverAppeared();
TracingContext.tracing().destroy();
}
log.debug("<---- TxLcn end ---->");
return var6;
} else {
return business.call();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
以上代码是该类的核心逻辑,可以看出来
TX-LCN
事务的处理全部都是走的这个类的该方法,最终会调用到DTXServiceExecutor
分布式事务执行器
# DTXServiceExecutor 作用
/**
* 事务业务执行
*
* @param info info
* @return Object
* @throws Throwable Throwable
*/
public Object transactionRunning(TxTransactionInfo info) throws Throwable {
// 1. 获取事务类型
String transactionType = info.getTransactionType();
// 2. 获取事务传播状态
DTXPropagationState propagationState = propagationResolver.resolvePropagationState(info);
// 2.1 如果不参与分布式事务立即终止
if (propagationState.isIgnored()) {
return info.getBusinessCallback().call();
}
// 3. 获取本地分布式事务控制器
DTXLocalControl dtxLocalControl = txLcnBeanHelper.loadDTXLocalControl(transactionType, propagationState);
// 4. 织入事务操作
try {
// 4.1 记录事务类型到事务上下文
Set<String> transactionTypeSet = globalContext.txContext(info.getGroupId()).getTransactionTypes();
transactionTypeSet.add(transactionType);
dtxLocalControl.preBusinessCode(info);
// 4.2 业务执行前
txLogger.txTrace(
info.getGroupId(), info.getUnitId(), "pre business code, unit type: {}", transactionType);
// 4.3 执行业务
Object result = dtxLocalControl.doBusinessCode(info);
// 4.4 业务执行成功
txLogger.txTrace(info.getGroupId(), info.getUnitId(), "business success");
dtxLocalControl.onBusinessCodeSuccess(info, result);
return result;
} catch (TransactionException e) {
txLogger.error(info.getGroupId(), info.getUnitId(), "before business code error");
throw e;
} catch (Throwable e) {
// 4.5 业务执行失败
txLogger.error(info.getGroupId(), info.getUnitId(), Transactions.TAG_TRANSACTION,
"business code error");
dtxLocalControl.onBusinessCodeError(info, e);
throw e;
} finally {
// 4.6 业务执行完毕
dtxLocalControl.postBusinessCode(info);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
通过以上代码可以看出,该类是整个事务执行关键类。
以上就是TCC模式比较核心的代码,其他的分支代码就不一一赘述了。可以看到和LCN模式有一些代码是重复的,整体就是通过解析不同的注解走不同的事务模式
# 实战
由第一篇分布式事务之TX-LCN 我们规划了俩个TC
分别是lcn-order
服务和lcn-pay
服务,我们的思路是订单服务调用支付服务,分别在订单服务表t_order
和支付服务表t_pay
中插入插入数据。
# 订单服务核心代码和数据表脚本
- 代码
/**
* @author:triumphxx
* @Date:2021/10/24
* @Time:2:13 下午
* @微信公众号:北漂码农有话说
* @网站:http://blog.triumphxx.com.cn
* @GitHub https://github.com/triumphxx
* @Desc:
**/
@RestController
public class TccOrderController {
@Autowired
TOrderDao tOrderDao;
@Autowired
private RestTemplate restTemplate;
//保存主键信息
private static Map<String,Integer> maps = new HashMap<>();
@PostMapping("/add-order-tcc")
@Transactional(rollbackFor = Exception.class)
@TccTransaction
public String add(){
TOrder bean = new TOrder();
Long no = Math.round((Math.random()+1) * 1000);
bean.setTId(no.intValue());
bean.setTName("order"+no.intValue());
JSONObject date = new JSONObject();
date.put("tId",bean.getTId());
date.put("tName",bean.getTName()+"pay");
restTemplate.postForEntity("http://lcn-pay/add-pay-tcc",date,String.class);
// int i = 1/0;
tOrderDao.insert(bean);
maps.put("a",no.intValue());
System.out.println("本次新增订单号为" + no.intValue());
return "新增订单成功";
}
public String confirmAdd(){
System.out.println("order confirm ");
System.out.println("订单新增成功 id为:"+maps.get("a"));
maps.clear();
return "新增订单成功";
}
/**
* 逆sql
* @param
* @return
*/
public String cancelAdd(){
Integer a = maps.get("a");
System.out.println("order cancel ");
System.out.println("删除的订单号为 :"+a);
tOrderDao.deleteByPrimaryKey(a);
maps.clear();
return "新增订单成功";
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
代码解读:若使用TCC模式只需要在你的方法上添加注解@TccTransaction,相对于lcn模式,我们可以看到多出来很多代码,所以tcc模式的数据一致性依赖于开发者来进行控制,对于开发要求高, 需要注意的是confirm的操作,方法名必须是confirm开头,cancel操作同样的道理,在confirm中我们可以做一些业务执行 成功后的操作,cancel操作的是try的逆操作,若try是新增数据,那么cancel就是删除数据。注意代码中有一个map的数据结果, 就是进行try、confirm、cancel操作业务的唯一标识进行相应的操作。
- 脚本
CREATE TABLE `t_order` (
`t_id` int(11) NOT NULL,
`t_name` varchar(45) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=latin1
2
3
4
# 支付服务核心代码和数据表脚本
- 代码
/**
* @author:triumphxx
* @Date:2021/10/24
* @Time:2:26 下午
* @微信公众号:北漂码农有话说
* @网站:http://blog.triumphxx.com.cn
* @GitHub https://github.com/triumphxx
* @Desc:
**/
@RestController
public class TccPayController {
@Autowired
TPayDao tPayDao;
//保存主键信息
private static Map<String,Integer> maps = new HashMap<>();
@PostMapping("/add-pay-tcc")
@Transactional(rollbackFor = Exception.class)
@TccTransaction
public String addPay(@RequestBody Map map){
TPay tPay = new TPay();
tPay.setTId((Integer) map.get("tId"));
tPay.setTName((String) map.get("tName"));
tPayDao.insert(tPay);
maps.put("a",(Integer) map.get("tId"));
// int i = 1/0;
System.out.println("本次新增支付号为" + (Integer) map.get("tId"));
return "新增支付成功";
}
public String confirmAddPay(Map map){
System.out.println("pay confirm");
System.out.println("本次新增支付号为" + maps.get("a"));
maps.clear();
return "新增支付成功";
}
/**
* 逆sql
* @param map
* @return
*/
public String cancelAddPay(Map map){
Integer a = maps.get("a");
System.out.println("pay cancel");
System.out.println("本次删除支付号为" + maps.get("a"));
tPayDao.deleteByPrimaryKey(a);
maps.clear();
return "取消支付成功";
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
- 脚本
CREATE TABLE `t_pay` (
`t_id` int(11) NOT NULL,
`t_name` varchar(45) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=latin1
2
3
4
# 测试流程
- 启动
Redis
- 启动
TM
- 启动注册中心
eureka-server
- 启动服务
lcn-order
- 启动服务
lcn-pay
- 请求接口
http://localhost:8001/add-order
- 代码搞一个异常看数据是否进行回滚
# 小结
本篇我们分析了TX-LCN
分布式事务的TCC
模式的原理及相关源码,以及搭建服务的进行测试。希望能对大家有所帮助。
源码地址源码传送门