高可用-分布式事务实战
在分布式系统中,分布式事务是最具挑战性的技术难题之一。
当一个业务操作需要跨越多个服务、多个数据库时,如何保证数据的一致性?如何在性能和一致性之间取得平衡?
本文将深入探讨分布式事务的理论基础、主流解决方案以及实战经验。
# 一、分布式事务的挑战
# 1、为什么需要分布式事务?
在单体应用时代,我们依赖数据库的本地事务来保证 ACID 特性:
@Transactional
public void transfer(Long fromId, Long toId, BigDecimal amount) {
accountDao.debit(fromId, amount); // 扣款
accountDao.credit(toId, amount); // 入账
}
但在微服务架构下,一个业务操作可能涉及多个服务:
订单服务 → 库存服务 → 积分服务 → 支付服务
典型场景:
- 电商下单: 创建订单 + 扣减库存 + 扣减优惠券 + 扣款
- 转账业务: A账户扣款 + B账户入账 + 记录流水
- 支付回调: 更新订单状态 + 增加积分 + 发送通知
# 2、分布式事务的核心挑战
| 挑战 | 描述 | 影响 |
|---|---|---|
| 网络不可靠 | 网络分区、超时、丢包 | 无法保证原子性 |
| 部分失败 | 某些服务成功,某些失败 | 数据不一致 |
| 性能开销 | 协调开销、锁等待 | 系统吞吐量下降 |
| 复杂度高 | 需要考虑重试、补偿、幂等 | 开发和维护成本高 |
| 运维困难 | 故障排查、事务状态追踪 | 问题定位困难 |
# 3、ACID 在分布式环境的困境
传统数据库事务的 ACID 特性在分布式环境下面临挑战:
- 原子性(Atomicity): 跨服务的操作如何保证全部成功或全部失败?
- 一致性(Consistency): 多个数据源如何保持一致?
- 隔离性(Isolation): 分布式环境下的并发控制如何实现?
- 持久性(Durability): 已提交的事务如何保证不丢失?
# 二、分布式事务理论基础
# 1、CAP 理论与分布式事务
回顾 CAP 理论(详见高可用-分布式基础之CAP理论):
- C (Consistency): 强一致性
- A (Availability): 高可用性
- P (Partition tolerance): 分区容错性
在分布式事务中,P 是必须保证的,真正的选择是在 C 和 A 之间权衡:
| 选择 | 特点 | 适用场景 | 代表方案 |
|---|---|---|---|
| CP | 强一致性,可能阻塞 | 金融、支付 | 2PC、3PC |
| AP | 最终一致性,高可用 | 电商、社交 | TCC、Saga |
# 2、BASE 理论
BASE 是对 CAP 中 AP 方案的补充:
- Basically Available: 基本可用
- Soft state: 软状态
- Eventually consistent: 最终一致性
核心思想: 通过牺牲强一致性来获得可用性,在一定时间窗口后达到最终一致。
# 3、分布式事务的一致性模型
| 一致性级别 | 描述 | 实现难度 | 性能 | 适用场景 |
|---|---|---|---|---|
| 强一致性 | 事务完成后立即一致 | 高 | 低 | 金融交易 |
| 最终一致性 | 一定时间后达到一致 | 中 | 高 | 电商订单 |
| 因果一致性 | 有因果关系的操作保持顺序 | 中 | 中 | 社交评论 |
| 弱一致性 | 不保证何时一致,甚至可能不一致 | 低 | 最高 | 浏览量统计 |
# 三、两阶段提交协议 (2PC)
# 1、2PC 基本原理
两阶段提交是最经典的强一致性分布式事务协议,包含两个阶段:
# 阶段一: 准备阶段 (Prepare)
协调者 (Coordinator) 参与者 (Participant)
| |
|-------- Prepare ------------->|
| | 执行事务但不提交
| | 写 Undo/Redo 日志
|<------- Yes/No ---------------|
- 协调者向所有参与者发送
Prepare请求 - 参与者执行事务操作,写入 Undo 和 Redo 日志
- 参与者向协调者返回
Yes(准备成功) 或No(准备失败)
# 阶段二: 提交阶段 (Commit)
情况1: 所有参与者都返回 Yes
协调者 参与者
| |
|-------- Commit -------------->|
| | 正式提交事务
|<------- ACK ------------------|
情况2: 任一参与者返回 No
协调者 参与者
| |
|-------- Rollback ------------>|
| | 回滚事务
|<------- ACK ------------------|
# 2、2PC 代码示例
协调者实现:
public class TwoPhaseCommitCoordinator {
private List<Participant> participants;
public boolean executeTransaction(Transaction tx) {
// 阶段一: 准备阶段
List<Boolean> prepareResults = new ArrayList<>();
for (Participant p : participants) {
try {
boolean prepared = p.prepare(tx);
prepareResults.add(prepared);
} catch (Exception e) {
prepareResults.add(false);
}
}
// 判断是否所有参与者都准备成功
boolean allPrepared = prepareResults.stream()
.allMatch(result -> result);
// 阶段二: 提交或回滚
if (allPrepared) {
// 所有参与者都准备好,执行提交
for (Participant p : participants) {
p.commit(tx);
}
return true;
} else {
// 有参与者准备失败,执行回滚
for (Participant p : participants) {
p.rollback(tx);
}
return false;
}
}
}
参与者实现:
public class DatabaseParticipant implements Participant {
private Connection connection;
@Override
public boolean prepare(Transaction tx) {
try {
// 开启事务
connection.setAutoCommit(false);
// 执行 SQL 操作
executeSql(tx.getSqlStatements());
// 写入 Undo/Redo 日志 (数据库自动完成)
// 返回准备成功
return true;
} catch (Exception e) {
return false;
}
}
@Override
public void commit(Transaction tx) {
try {
connection.commit();
} catch (SQLException e) {
// 记录日志
log.error("Commit failed", e);
}
}
@Override
public void rollback(Transaction tx) {
try {
connection.rollback();
} catch (SQLException e) {
log.error("Rollback failed", e);
}
}
}
# 3、2PC 的问题
# 3.1、同步阻塞问题
所有参与者在准备阶段后都处于阻塞状态,等待协调者的指令:
参与者 A: [准备完成] → 等待... → 等待... → 等待...
参与者 B: [准备完成] → 等待... → 等待... → 等待...
参与者 C: [准备失败] → 立即返回
协调者收到 C 的失败响应后才能发出回滚指令
期间 A、B 一直持有资源锁,无法处理其他请求
影响: 系统吞吐量低,并发能力差。
# 3.2、单点故障问题
协调者是单点,如果在阶段二之前崩溃:
协调者发送 Prepare 后崩溃
→ 所有参与者永久等待
→ 资源被锁定,无法释放
解决方案:
- 协调者持久化事务状态
- 参与者设置超时机制
- 引入备份协调者
# 3.3、数据不一致问题
如果协调者在阶段二发送 Commit 时,部分参与者收到指令,部分未收到:
协调者: 发送 Commit → [网络分区] → 崩溃
参与者 A: 收到 Commit → 提交成功 ✓
参与者 B: 未收到消息 → 超时回滚 ✗
结果: A 提交了事务,B 回滚了事务,数据不一致。
# 4、2PC 的应用场景
| 场景 | 是否适用 | 原因 |
|---|---|---|
| 金融转账 | ✓ | 要求强一致性,可接受性能损失 |
| 库存扣减 | ✗ | 高并发场景,阻塞问题严重 |
| 订单创建 | ✗ | 涉及多个服务,协调复杂 |
| 配置同步 | ✓ | 低频操作,对一致性要求高 |
# 四、三阶段提交协议 (3PC)
# 1、3PC 基本原理
三阶段提交是 2PC 的改进版,将提交过程分为三个阶段:
# 阶段一: CanCommit
协调者 参与者
|-------- CanCommit? --------->|
| | 检查是否能执行事务
|<------- Yes/No ---------------|
参与者只需检查是否能执行,不实际执行事务,不锁定资源。
# 阶段二: PreCommit
情况1: 所有参与者返回 Yes
协调者 参与者
|-------- PreCommit ---------->|
| | 执行事务,锁定资源
|<------- ACK ------------------|
情况2: 任一参与者返回 No
协调者 参与者
|-------- Abort -------------->|
| | 中止操作
# 阶段三: DoCommit
情况1: PreCommit 阶段都成功
协调者 参与者
|-------- DoCommit ----------->|
| | 正式提交
|<------- ACK ------------------|
情况2: 超时或失败
参与者在超时后自动提交 (与 2PC 的关键区别)
# 2、3PC 的改进
| 改进点 | 2PC 问题 | 3PC 解决方案 |
|---|---|---|
| 同步阻塞 | 准备阶段就锁定资源 | CanCommit 阶段不锁资源 |
| 单点故障 | 协调者故障导致永久阻塞 | 参与者超时后自动提交 |
| 数据一致性 | 网络分区可能导致不一致 | 降低了不一致概率(但仍存在) |
# 3、3PC 的问题
虽然 3PC 改进了 2PC,但仍有问题:
网络分区下的一致性问题:
协调者发送 DoCommit 后网络分区:
- 分区1: 收到 DoCommit → 提交
- 分区2: 超时自动提交
- 看似一致,但如果协调者实际发的是 Abort 呢?
结论: 3PC 降低了阻塞概率,但无法完全解决一致性问题。
# 五、TCC 补偿型事务
# 1、TCC 基本原理
TCC 是一种补偿型事务方案,将业务操作分为三个阶段:
- T (Try): 尝试执行,预留资源
- C (Confirm): 确认执行,使用预留资源
- C (Cancel): 取消执行,释放预留资源
核心思想: 将事务的提交和回滚逻辑下放到业务层。
# 2、TCC 执行流程
阶段一: Try 阶段
订单服务: 创建订单(待支付状态)
库存服务: 冻结库存(库存-1, 冻结+1)
积分服务: 冻结积分
支付服务: 预扣款(余额-100, 冻结+100)
阶段二: Confirm/Cancel
如果所有 Try 成功:
订单服务: 更新订单为已支付
库存服务: 扣减冻结库存
积分服务: 增加积分
支付服务: 扣减冻结金额
如果任一 Try 失败:
订单服务: 删除订单
库存服务: 释放冻结库存
积分服务: 释放冻结积分
支付服务: 释放冻结金额
# 3、TCC 代码示例
账户服务 TCC 实现:
@Service
public class AccountTccService {
@Autowired
private AccountDao accountDao;
/**
* Try: 冻结金额
*/
@Transactional
public boolean tryDeduct(String accountId, BigDecimal amount) {
Account account = accountDao.findById(accountId);
// 检查余额是否足够
if (account.getBalance().compareTo(amount) < 0) {
return false;
}
// 冻结金额: balance - amount, frozen + amount
account.setBalance(account.getBalance().subtract(amount));
account.setFrozen(account.getFrozen().add(amount));
accountDao.update(account);
return true;
}
/**
* Confirm: 扣减冻结金额
*/
@Transactional
public void confirmDeduct(String accountId, BigDecimal amount) {
Account account = accountDao.findById(accountId);
// 扣减冻结金额: frozen - amount
account.setFrozen(account.getFrozen().subtract(amount));
accountDao.update(account);
}
/**
* Cancel: 释放冻结金额
*/
@Transactional
public void cancelDeduct(String accountId, BigDecimal amount) {
Account account = accountDao.findById(accountId);
// 释放冻结: balance + amount, frozen - amount
account.setBalance(account.getBalance().add(amount));
account.setFrozen(account.getFrozen().subtract(amount));
accountDao.update(account);
}
}
TCC 事务协调器:
@Service
public class TccCoordinator {
@Autowired
private AccountTccService accountService;
@Autowired
private InventoryTccService inventoryService;
@Autowired
private OrderTccService orderService;
public boolean executeTransaction(TransferRequest request) {
String txId = UUID.randomUUID().toString();
try {
// Try 阶段: 尝试所有操作
boolean tryResult = tryAll(txId, request);
if (tryResult) {
// Confirm 阶段: 确认所有操作
confirmAll(txId, request);
return true;
} else {
// Cancel 阶段: 取消所有操作
cancelAll(txId, request);
return false;
}
} catch (Exception e) {
// 异常时取消
cancelAll(txId, request);
return false;
}
}
private boolean tryAll(String txId, TransferRequest request) {
// 记录事务日志
logTccTransaction(txId, TccPhase.TRY);
boolean accountTry = accountService.tryDeduct(
request.getAccountId(), request.getAmount());
boolean inventoryTry = inventoryService.tryReserve(
request.getProductId(), request.getQuantity());
boolean orderTry = orderService.tryCreate(request);
return accountTry && inventoryTry && orderTry;
}
private void confirmAll(String txId, TransferRequest request) {
logTccTransaction(txId, TccPhase.CONFIRM);
accountService.confirmDeduct(request.getAccountId(), request.getAmount());
inventoryService.confirmReserve(request.getProductId(), request.getQuantity());
orderService.confirmCreate(request);
}
private void cancelAll(String txId, TransferRequest request) {
logTccTransaction(txId, TccPhase.CANCEL);
accountService.cancelDeduct(request.getAccountId(), request.getAmount());
inventoryService.cancelReserve(request.getProductId(), request.getQuantity());
orderService.cancelCreate(request);
}
}
# 4、TCC 的三大挑战
# 4.1、空回滚问题
场景: Try 阶段因网络超时未执行,但 Cancel 被调用:
Try 请求发出 → 网络超时 → 协调器认为失败 → 调用 Cancel
但实际上 Try 根本没执行,没有资源可释放
解决: Cancel 需要检查 Try 是否执行过:
public void cancelDeduct(String txId, String accountId, BigDecimal amount) {
// 检查事务记录
TccLog log = tccLogDao.findByTxId(txId);
if (log == null) {
// Try 未执行,记录空回滚
tccLogDao.insert(new TccLog(txId, TccPhase.CANCEL));
return;
}
// Try 已执行,正常回滚
Account account = accountDao.findById(accountId);
account.setBalance(account.getBalance().add(amount));
account.setFrozen(account.getFrozen().subtract(amount));
accountDao.update(account);
}
# 4.2、幂等性问题
场景: 由于网络重试,Confirm 或 Cancel 被多次调用:
Confirm 第1次: frozen - 100 (正常)
Confirm 第2次: frozen - 100 (重复扣减!)
解决: 使用事务日志记录执行状态:
public void confirmDeduct(String txId, String accountId, BigDecimal amount) {
// 检查是否已执行
TccLog log = tccLogDao.findByTxId(txId);
if (log != null && log.getPhase() == TccPhase.CONFIRM) {
// 已执行过 Confirm,直接返回
return;
}
// 执行 Confirm 逻辑
Account account = accountDao.findById(accountId);
account.setFrozen(account.getFrozen().subtract(amount));
accountDao.update(account);
// 记录执行状态
tccLogDao.updatePhase(txId, TccPhase.CONFIRM);
}
# 4.3、悬挂问题
场景: Cancel 先于 Try 执行:
1. Try 请求因网络延迟未到达
2. 协调器超时,调用 Cancel (空回滚,记录日志)
3. Try 请求到达并执行 (资源被冻结)
4. 资源永久冻结,无法释放!
解决: Try 需要检查是否已 Cancel:
public boolean tryDeduct(String txId, String accountId, BigDecimal amount) {
// 检查是否已取消
TccLog log = tccLogDao.findByTxId(txId);
if (log != null && log.getPhase() == TccPhase.CANCEL) {
// 已取消,拒绝执行 Try
return false;
}
// 正常执行 Try
Account account = accountDao.findById(accountId);
if (account.getBalance().compareTo(amount) < 0) {
return false;
}
account.setBalance(account.getBalance().subtract(amount));
account.setFrozen(account.getFrozen().add(amount));
accountDao.update(account);
// 记录 Try 执行
tccLogDao.insert(new TccLog(txId, TccPhase.TRY));
return true;
}
# 5、TCC 框架: Seata
Seata 是阿里开源的分布式事务解决方案,支持 TCC 模式。
核心组件:
- TC (Transaction Coordinator): 事务协调器,维护全局事务状态
- TM (Transaction Manager): 事务管理器,发起全局事务
- RM (Resource Manager): 资源管理器,管理分支事务
使用示例:
// 1. 在业务发起方添加 @GlobalTransactional
@Service
public class BusinessService {
@GlobalTransactional
public void purchase(String userId, String productId, int count) {
// 调用各个服务
orderService.create(userId, productId, count);
inventoryService.deduct(productId, count);
accountService.deduct(userId, calculateAmount(productId, count));
}
}
// 2. 在各个服务实现 TCC 接口
@LocalTCC
public interface InventoryTccAction {
@TwoPhaseBusinessAction(name = "inventoryTcc", commitMethod = "commit", rollbackMethod = "rollback")
boolean prepare(@BusinessActionContextParameter(paramName = "productId") String productId,
@BusinessActionContextParameter(paramName = "count") int count);
boolean commit(BusinessActionContext context);
boolean rollback(BusinessActionContext context);
}
# 六、Saga 长事务模式
# 1、Saga 基本原理
Saga 模式将长事务拆分为多个本地短事务,每个子事务都有对应的补偿操作:
T1 → T2 → T3 → T4 (正常流程)
C1 ← C2 ← C3 ← C4 (补偿流程)
核心思想: 不追求原子性,通过补偿达到最终一致性。
# 2、Saga 的两种协调模式
# 2.1、编排模式 (Choreography)
去中心化,各服务通过事件驱动自行协调:
订单服务: 创建订单 → 发布 OrderCreated 事件
↓
库存服务: 监听 OrderCreated → 扣减库存 → 发布 InventoryDeducted 事件
↓
支付服务: 监听 InventoryDeducted → 扣款 → 发布 PaymentCompleted 事件
优点: 解耦,服务自治
缺点: 流程分散,难以理解和调试
# 2.2、编排模式 (Orchestration)
中心化,由协调器统一管理流程:
Saga 协调器:
1. 调用订单服务创建订单
2. 调用库存服务扣减库存
3. 调用支付服务扣款
4. 如果任一步骤失败,按相反顺序执行补偿
优点: 流程清晰,易于管理
缺点: 协调器是单点,需要保证高可用
# 3、Saga 代码示例
Saga 协调器实现:
@Service
public class OrderSagaOrchestrator {
@Autowired
private OrderService orderService;
@Autowired
private InventoryService inventoryService;
@Autowired
private PaymentService paymentService;
@Autowired
private SagaLogRepository sagaLogRepo;
public boolean executeOrderSaga(OrderRequest request) {
String sagaId = UUID.randomUUID().toString();
SagaLog sagaLog = new SagaLog(sagaId);
try {
// 步骤1: 创建订单
String orderId = orderService.createOrder(request);
sagaLog.addStep("createOrder", orderId);
sagaLogRepo.save(sagaLog);
// 步骤2: 扣减库存
boolean inventoryOk = inventoryService.deduct(
request.getProductId(), request.getQuantity());
if (!inventoryOk) {
throw new SagaException("库存不足");
}
sagaLog.addStep("deductInventory", request.getProductId());
sagaLogRepo.save(sagaLog);
// 步骤3: 扣款
boolean paymentOk = paymentService.pay(
request.getUserId(), request.getAmount());
if (!paymentOk) {
throw new SagaException("余额不足");
}
sagaLog.addStep("payment", request.getUserId());
sagaLogRepo.save(sagaLog);
// 所有步骤成功
sagaLog.setStatus(SagaStatus.COMPLETED);
sagaLogRepo.save(sagaLog);
return true;
} catch (Exception e) {
// 执行补偿
compensate(sagaLog);
return false;
}
}
private void compensate(SagaLog sagaLog) {
// 按相反顺序执行补偿
List<SagaStep> steps = sagaLog.getSteps();
for (int i = steps.size() - 1; i >= 0; i--) {
SagaStep step = steps.get(i);
try {
switch (step.getName()) {
case "createOrder":
orderService.cancelOrder(step.getData());
break;
case "deductInventory":
inventoryService.restore(step.getData(), sagaLog.getQuantity());
break;
case "payment":
paymentService.refund(step.getData(), sagaLog.getAmount());
break;
}
step.setCompensated(true);
} catch (Exception e) {
// 补偿失败,记录日志,后续人工处理
log.error("Compensation failed for step: {}", step.getName(), e);
}
}
sagaLog.setStatus(SagaStatus.COMPENSATED);
sagaLogRepo.save(sagaLog);
}
}
补偿操作实现:
@Service
public class InventoryService {
@Transactional
public boolean deduct(String productId, int quantity) {
Inventory inventory = inventoryDao.findById(productId);
if (inventory.getStock() < quantity) {
return false;
}
inventory.setStock(inventory.getStock() - quantity);
inventoryDao.update(inventory);
return true;
}
/**
* 补偿操作: 恢复库存
*/
@Transactional
public void restore(String productId, int quantity) {
Inventory inventory = inventoryDao.findById(productId);
inventory.setStock(inventory.getStock() + quantity);
inventoryDao.update(inventory);
}
}
# 4、Saga 的注意事项
# 4.1、补偿操作的设计原则
| 原则 | 说明 | 示例 |
|---|---|---|
| 幂等性 | 多次执行结果相同 | 使用唯一ID防止重复退款 |
| 可补偿性 | 每个操作都有对应补偿 | 扣库存 ↔ 加库存 |
| 可重试性 | 补偿操作失败后可重试 | 网络超时后重试退款 |
| 最终一致性 | 补偿后达到一致状态 | 订单取消后,库存、账户都恢复 |
# 4.2、补偿的局限性
不可补偿的场景:
- 发送短信通知 (无法"取消发送")
- 发放优惠券 (用户已使用)
- 物理世界的操作 (打印发票)
解决思路:
- 延迟执行: 等事务确认后再发送通知
- 对冲操作: 发送"订单取消"通知
- 人工介入: 记录日志,人工处理
# 5、Saga 框架: Eventuate Tram Saga
Eventuate Tram 是一个基于事件驱动的 Saga 框架。
定义 Saga:
public class OrderSaga implements SimpleSaga<OrderSagaData> {
private SagaDefinition<OrderSagaData> sagaDefinition;
public OrderSaga(OrderService orderService,
InventoryService inventoryService,
PaymentService paymentService) {
this.sagaDefinition = step()
.invokeLocal(orderService::create)
.withCompensation(orderService::cancel)
.step()
.invokeParticipant(inventoryService.deduct())
.withCompensation(inventoryService.restore())
.step()
.invokeParticipant(paymentService.pay())
.withCompensation(paymentService.refund())
.build();
}
@Override
public SagaDefinition<OrderSagaData> getSagaDefinition() {
return sagaDefinition;
}
}
# 七、最终一致性方案
# 1、本地消息表模式
核心思想: 利用本地事务保证业务操作和消息发送的原子性。
执行流程:
1. 开启本地事务:
- 执行业务操作 (如创建订单)
- 插入消息记录到消息表
- 提交事务
2. 定时任务扫描消息表:
- 查询未发送的消息
- 发送到消息队列
- 标记为已发送
3. 消费者处理消息:
- 执行下游业务 (如扣减库存)
- 返回 ACK
代码实现:
@Service
public class OrderService {
@Autowired
private OrderDao orderDao;
@Autowired
private LocalMessageDao messageDao;
@Transactional
public void createOrder(OrderRequest request) {
// 1. 创建订单
Order order = new Order();
order.setUserId(request.getUserId());
order.setProductId(request.getProductId());
order.setStatus(OrderStatus.PENDING);
orderDao.insert(order);
// 2. 插入本地消息
LocalMessage message = new LocalMessage();
message.setTopic("order_created");
message.setKey(order.getId());
message.setContent(JSON.toJSONString(order));
message.setStatus(MessageStatus.PENDING);
messageDao.insert(message);
// 3. 事务提交 (订单和消息同时成功或失败)
}
}
消息发送定时任务:
@Component
public class MessagePublisher {
@Autowired
private LocalMessageDao messageDao;
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Scheduled(fixedDelay = 5000) // 每5秒扫描一次
public void publishPendingMessages() {
List<LocalMessage> messages = messageDao.findPendingMessages(100);
for (LocalMessage message : messages) {
try {
// 发送到消息队列
rocketMQTemplate.convertAndSend(message.getTopic(), message.getContent());
// 标记为已发送
message.setStatus(MessageStatus.SENT);
message.setSentTime(LocalDateTime.now());
messageDao.update(message);
} catch (Exception e) {
// 发送失败,下次重试
log.error("Failed to send message: {}", message.getId(), e);
}
}
}
}
优点:
- 实现简单,无需引入分布式事务框架
- 消息不会丢失 (存储在数据库)
缺点:
- 需要额外的消息表和定时任务
- 消息发送有延迟
# 2、事务消息模式
RocketMQ 和 Kafka 都支持事务消息。
RocketMQ 事务消息流程:
1. 发送半消息 (Half Message):
生产者 → RocketMQ: 发送半消息
RocketMQ: 存储半消息,对消费者不可见
2. 执行本地事务:
生产者: 执行本地事务 (如创建订单)
3. 提交或回滚消息:
如果本地事务成功:
生产者 → RocketMQ: Commit
RocketMQ: 半消息对消费者可见
如果本地事务失败:
生产者 → RocketMQ: Rollback
RocketMQ: 删除半消息
4. 事务回查 (如果长时间未提交):
RocketMQ → 生产者: 查询本地事务状态
生产者: 检查订单是否创建成功
生产者 → RocketMQ: Commit/Rollback
代码实现:
@Service
public class OrderTransactionService {
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Autowired
private OrderDao orderDao;
public void createOrderWithTransactionMessage(OrderRequest request) {
// 发送事务消息
rocketMQTemplate.sendMessageInTransaction(
"order_topic",
MessageBuilder.withPayload(request).build(),
request
);
}
}
@Component
@RocketMQTransactionListener
public class OrderTransactionListener implements RocketMQLocalTransactionListener {
@Autowired
private OrderService orderService;
/**
* 执行本地事务
*/
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
OrderRequest request = (OrderRequest) arg;
// 执行本地事务: 创建订单
orderService.createOrder(request);
// 提交消息
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
// 回滚消息
return RocketMQLocalTransactionState.ROLLBACK;
}
}
/**
* 事务状态回查
*/
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
try {
OrderRequest request = JSON.parseObject(
new String(msg.getBody()), OrderRequest.class);
// 查询订单是否创建成功
Order order = orderService.getOrder(request.getOrderId());
if (order != null) {
return RocketMQLocalTransactionState.COMMIT;
} else {
return RocketMQLocalTransactionState.ROLLBACK;
}
} catch (Exception e) {
return RocketMQLocalTransactionState.UNKNOWN;
}
}
}
消费者实现:
@Service
@RocketMQMessageListener(topic = "order_topic", consumerGroup = "inventory_consumer")
public class InventoryConsumer implements RocketMQListener<OrderRequest> {
@Autowired
private InventoryService inventoryService;
@Override
public void onMessage(OrderRequest request) {
// 幂等性检查
if (isDuplicate(request.getOrderId())) {
return;
}
// 扣减库存
inventoryService.deduct(request.getProductId(), request.getQuantity());
// 记录消费记录
recordConsumption(request.getOrderId());
}
}
# 3、最大努力通知模式
适用场景: 对一致性要求不高,允许数据最终不一致。
核心思想: 尽最大努力发送通知,失败后定时重试,超过一定次数后放弃。
执行流程:
1. 执行本地事务
2. 发送通知 (HTTP/MQ)
3. 如果失败,记录到重试表
4. 定时任务扫描重试表:
- 重试发送 (指数退避: 1分钟, 5分钟, 30分钟...)
- 达到最大重试次数后,标记为失败
- 人工介入处理
典型应用:
- 支付回调通知
- 短信/邮件发送
- 第三方系统同步
代码示例:
@Service
public class PaymentNotificationService {
@Autowired
private NotificationRetryDao retryDao;
public void notifyOrderPaid(String orderId) {
try {
// 尝试发送通知
sendNotification(orderId);
} catch (Exception e) {
// 失败,记录到重试表
NotificationRetry retry = new NotificationRetry();
retry.setOrderId(orderId);
retry.setRetryCount(0);
retry.setNextRetryTime(LocalDateTime.now().plusMinutes(1));
retryDao.insert(retry);
}
}
@Scheduled(fixedDelay = 60000) // 每分钟执行一次
public void retryFailedNotifications() {
List<NotificationRetry> retries = retryDao.findPendingRetries();
for (NotificationRetry retry : retries) {
if (retry.getRetryCount() >= 10) {
// 超过最大重试次数,标记为失败
retry.setStatus(RetryStatus.FAILED);
retryDao.update(retry);
continue;
}
try {
sendNotification(retry.getOrderId());
// 成功,删除重试记录
retryDao.delete(retry.getId());
} catch (Exception e) {
// 失败,增加重试次数,计算下次重试时间 (指数退避)
retry.setRetryCount(retry.getRetryCount() + 1);
int delayMinutes = (int) Math.pow(2, retry.getRetryCount());
retry.setNextRetryTime(LocalDateTime.now().plusMinutes(delayMinutes));
retryDao.update(retry);
}
}
}
}
# 八、分布式事务实战经验
# 1、技术选型决策树
1. 是否要求强一致性?
├─ 是 → 2PC/3PC (仅限小规模系统)
└─ 否 → 继续判断
2. 业务是否可补偿?
├─ 是 → TCC 或 Saga
│ ├─ 需要严格的中间状态 → TCC
│ └─ 流程较长,允许最终一致 → Saga
└─ 否 → 继续判断
3. 是否可以异步处理?
├─ 是 → 消息队列 + 最终一致性
│ ├─ 对可靠性要求高 → 事务消息
│ └─ 允许少量丢失 → 最大努力通知
└─ 否 → 考虑业务拆分或重新设计
# 2、常见场景的方案选择
| 业务场景 | 推荐方案 | 理由 |
|---|---|---|
| 电商下单 | Saga + 消息队列 | 流程长,允许最终一致,性能要求高 |
| 支付转账 | TCC | 涉及资金,需要严格的中间状态控制 |
| 库存扣减 | TCC + 预扣 | 防止超卖,需要原子性操作 |
| 积分增加 | 消息队列 | 允许延迟,对一致性要求不高 |
| 订单状态同步 | 事务消息 | 需要保证消息可靠性,但允许延迟 |
| 第三方回调 | 最大努力通知 | 第三方系统不可控,尽力而为 |
# 3、分布式事务的最佳实践
# 3.1、能不用就不用
优先考虑:
- 业务拆分: 将强一致性需求拆分到单个服务内
- 异步化: 将同步调用改为异步消息
- 最终一致性: 接受数据短暂不一致
反例:
// 不好的设计: 一个事务跨5个服务
@GlobalTransactional
public void complexOperation() {
serviceA.method();
serviceB.method();
serviceC.method();
serviceD.method();
serviceE.method();
}
改进:
// 好的设计: 核心操作在一个服务内,其他异步处理
@Transactional
public void coreOperation() {
// 核心业务在本地事务内完成
orderDao.create(order);
orderItemDao.batchInsert(items);
// 发送事件,异步处理其他操作
eventPublisher.publish(new OrderCreatedEvent(order));
}
# 3.2、幂等性设计
所有分布式事务操作都必须保证幂等性。
实现方式:
1. 唯一ID + 状态检查:
@Transactional
public void processPayment(String paymentId, BigDecimal amount) {
// 检查是否已处理
Payment payment = paymentDao.findById(paymentId);
if (payment != null && payment.getStatus() == PaymentStatus.SUCCESS) {
return; // 已处理,直接返回
}
// 执行支付逻辑
// ...
}
2. 去重表:
@Transactional
public void deductInventory(String requestId, String productId, int quantity) {
// 检查请求是否已处理
if (deduplicationDao.exists(requestId)) {
return;
}
// 插入去重记录
deduplicationDao.insert(requestId);
// 执行库存扣减
inventoryDao.deduct(productId, quantity);
}
3. 乐观锁:
@Transactional
public void updateInventory(String productId, int quantity, int version) {
int updated = inventoryDao.updateWithVersion(productId, quantity, version);
if (updated == 0) {
throw new ConcurrentModificationException("库存已被修改");
}
}
# 3.3、超时和重试策略
设置合理的超时时间:
@GlobalTransactional(timeoutMills = 60000) // 1分钟超时
public void longRunningTransaction() {
// ...
}
重试策略:
@Retryable(
value = {NetworkException.class},
maxAttempts = 3,
backoff = @Backoff(delay = 1000, multiplier = 2)
)
public void remoteCall() {
// 网络调用,失败后重试
}
重试注意事项:
- 只重试临时性错误 (网络超时、服务暂不可用)
- 不重试业务错误 (余额不足、库存不足)
- 必须保证幂等性
# 3.4、监控和告警
关键指标:
| 指标 | 说明 | 告警阈值 |
|---|---|---|
| 事务成功率 | 提交成功的事务占比 | < 95% |
| 事务耗时 | 事务从开始到结束的时间 | P99 > 5s |
| 补偿率 | 需要补偿的事务占比 | > 5% |
| 消息积压 | 未消费的消息数量 | > 10000 |
| 重试次数 | 平均重试次数 | > 3 |
日志记录:
@Aspect
@Component
public class TransactionLogAspect {
@Around("@annotation(globalTransactional)")
public Object logTransaction(ProceedingJoinPoint pjp,
GlobalTransactional globalTransactional) throws Throwable {
String txId = RootContext.getXID();
long startTime = System.currentTimeMillis();
try {
Object result = pjp.proceed();
long duration = System.currentTimeMillis() - startTime;
log.info("Transaction success: txId={}, duration={}ms", txId, duration);
return result;
} catch (Exception e) {
long duration = System.currentTimeMillis() - startTime;
log.error("Transaction failed: txId={}, duration={}ms", txId, duration, e);
throw e;
}
}
}
# 4、常见问题和解决方案
# 4.1、问题: 分布式事务性能低
原因:
- 多次网络调用
- 资源锁定时间长
- 同步等待
解决方案:
- 减少分布式事务的使用范围
- 使用异步消息代替同步调用
- 采用 AP 模式 (TCC/Saga) 代替 CP 模式 (2PC)
# 4.2、问题: 事务悬挂或长时间未完成
原因:
- 协调器故障
- 参与者超时
- 网络分区
解决方案:
- 设置全局超时时间
- 实现事务恢复机制
- 定时扫描长时间未完成的事务
@Scheduled(fixedDelay = 300000) // 每5分钟执行一次
public void cleanupZombieTransactions() {
// 查询超过30分钟未完成的事务
List<Transaction> zombies = transactionDao.findZombieTransactions(30);
for (Transaction tx : zombies) {
try {
// 尝试回滚
transactionManager.rollback(tx.getId());
} catch (Exception e) {
// 记录日志,人工介入
log.error("Failed to cleanup zombie transaction: {}", tx.getId(), e);
}
}
}
# 4.3、问题: 消息重复消费
原因:
- 消费者确认失败
- 网络波动导致重传
- 消息队列重平衡
解决方案: 实现消费幂等
@Component
public class IdempotentMessageConsumer {
@Autowired
private RedisTemplate<String, String> redisTemplate;
public void consume(Message message) {
String messageId = message.getMessageId();
// 使用 Redis 去重,设置过期时间
Boolean success = redisTemplate.opsForValue()
.setIfAbsent(messageId, "1", 24, TimeUnit.HOURS);
if (Boolean.FALSE.equals(success)) {
// 已消费过,跳过
return;
}
// 处理消息
processMessage(message);
}
}
# 九、总结与展望
# 1、核心要点回顾
| 方案 | 一致性 | 可用性 | 性能 | 复杂度 | 适用场景 |
|---|---|---|---|---|---|
| 2PC/3PC | 强一致 | 低 | 低 | 高 | 小规模、强一致需求 |
| TCC | 最终一致 | 中 | 中 | 高 | 金融、交易、库存 |
| Saga | 最终一致 | 高 | 高 | 中 | 长流程、可补偿业务 |
| 消息队列 | 最终一致 | 高 | 高 | 低 | 异步、允许延迟 |
| 最大努力通知 | 弱一致 | 高 | 高 | 低 | 第三方通知、不重要数据 |
# 2、设计原则
- 能不用分布式事务就不用: 优先考虑业务拆分和最终一致性
- 选择合适的一致性级别: 不是所有场景都需要强一致性
- 保证幂等性: 所有操作必须支持重试
- 做好监控和告警: 及时发现和处理问题
- 设计降级方案: 关键路径要有人工介入机制
# 3、未来发展方向
- Serverless 事务: 云原生环境下的分布式事务
- AI 辅助决策: 自动选择最优的事务方案
- 跨链事务: 区块链领域的分布式事务
- 边缘计算事务: 边缘节点之间的事务协调
# 4、学习建议
- 理论结合实践: 在实际项目中应用,加深理解
- 研究开源框架: 深入学习 Seata、Eventuate 等框架源码
- 关注故障案例: 从大厂的事故中学习经验教训
- 持续跟进技术: 分布式事务领域不断演进
分布式事务没有银弹,只有最适合的方案。在实际工作中,需要根据业务特点、团队能力、系统规模等因素,综合权衡,做出最佳选择。
祝你变得更强!