分布式事务概述完全指南
分布式事务概述完全指南前言分布式事务是微服务架构中最具挑战性的技术问题之一。由于微服务系统中的服务分散在不同的数据库和服务器上传统的本地事务机制无法满足需求。本文将深入探讨分布式事务的理论基础、解决方案和最佳实践。一、分布式事务挑战1.1 CAP定理CAP定理指出分布式系统无法同时满足一致性Consistency、可用性Availability和分区容错性Partition Tolerance。┌─────────────────────────────────────────────────────┐ │ CAP Theorem │ │ │ │ ┌───────────────┐ │ │ │ Consistency │ │ │ │ C │ │ │ └───────┬───────┘ │ │ │ │ │ ┌───────────┼───────────┐ │ │ │ │ │ │ │ ▼ │ ▼ │ │ ┌───────────┐ │ ┌───────────┐ │ │ │Availability│ │ │ Partition │ │ │ │ A │ │ │ Tolerance │ │ │ └───────────┘ │ │ P │ │ │ │ └───────────┘ │ │ └───────────┼───────────┘ │ │ │ │ │ Base Theory │ └─────────────────────────────────────────────────────┘1.2 分布式事务问题// 分布式事务问题场景 Service public class OrderService { Autowired private OrderRepository orderRepository; Autowired private ProductServiceClient productService; Autowired private PaymentServiceClient paymentService; Transactional // 这个事务只能管理本地资源 public void createOrder(OrderCreateRequest request) { // 1. 创建订单本地数据库 Order order new Order(); order.setAmount(request.getAmount()); orderRepository.save(order); // 2. 扣减库存远程服务 productService.reduceStock(request.getProductId(), request.getQuantity()); // 3. 发起支付远程服务 paymentService.processPayment(order.getId(), request.getPaymentMethod()); // 如果步骤3失败前面的步骤无法回滚 } }二、Seata分布式事务2.1 Seata架构┌─────────────────────────────────────────────────────┐ │ Seata Architecture │ │ │ │ ┌──────────────────────────────────────────────┐ │ │ │ TC (Transaction Coordinator) │ │ │ │ ┌────────────────────────────────────────┐ │ │ │ │ │ Global Session Branch Session │ │ │ │ │ │ Management Management │ │ │ │ │ └────────────────────────────────────────┘ │ │ │ └──────────────────────────────────────────────┘ │ │ ▲ ▲ │ │ │ TM │ TM │ │ ┌──────┴──────┐ ┌──────┴──────┐ │ │ │ TM (Transaction Manager) │ │ │ │ │ ┌──────────────────────┐ │ │ │ │ │ │ Business Logic │ │ │ │ │ │ └──────────────────────┘ │ │ │ │ └────────────────────────────┘ │ │ │ │ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │ │ Service A │ │ Service B │ │ Service C │ │ │ │ ┌───────┐ │ │ ┌───────┐ │ │ ┌───────┐ │ │ │ │ │ RM │ │ │ │ RM │ │ │ │ RM │ │ │ │ │ │(Branch│ │ │ │(Branch│ │ │ │(Branch│ │ │ │ │ │Session)│ │ │ │Session)│ │ │ │Session)│ │ │ │ │ └───────┘ │ │ └───────┘ │ │ └───────┘ │ │ │ └─────────────┘ └─────────────┘ └─────────────┘ │ └─────────────────────────────────────────────────────┘2.2 Seata Server配置# registry.conf registry { type nacos nacos { application seata-server serverAddr 127.0.0.1:8848 namespace group SEATA_GROUP cluster default weight 1 } } config { type nacos nacos { serverAddr 127.0.0.1:8848 namespace group SEATA_GROUP dataId seataConfig password } } # application.yml server: port: 8091 spring: application: name: seata-server seata: config: type: nacos nacos: serverAddr: 127.0.0.1:8848 group: SEATA_GROUP registry: type: nacos nacos: serverAddr: 127.0.0.1:8848 group: SEATA_GROUP store: mode: db db: driver-class-name: com.mysql.cj.jdbc.Driver url: jdbc:mysql://127.0.0.1:3306/seata?useSSLfalse user: root password: password session: mode: db server: vgroup-mapping: my-tx-group: default enable-degrade: false recovery: committing-retry-period: 1000 async-committing-retry-period: 10002.3 Seata AT模式// 启用分布式事务 SpringBootApplication EnableAutoDataSourceProxy EnableDiscoveryClient public class OrderServiceApplication { public static void main(String[] args) { SpringApplication.run(OrderServiceApplication.class, args); } } // 分布式事务示例 Service public class OrderServiceImpl implements OrderService { GlobalTransactional(name create-order, rollbackFor Exception.class) Override public OrderDTO createOrder(CreateOrderRequest request) { log.info(Creating order: {}, request); // 1. 创建订单自动注册分支事务 Order order createLocalOrder(request); // 2. 扣减库存远程调用 boolean stockReduced inventoryClient.reduceStock( request.getProductId(), request.getQuantity() ); if (!stockReduced) { throw new BusinessException(Stock reduction failed); } // 3. 发送消息本地操作 orderMessageProducer.sendOrderCreated(order); return orderMapper.toDTO(order); } Transactional public Order createLocalOrder(CreateOrderRequest request) { Order order new Order(); order.setOrderNumber(generateOrderNumber()); order.setProductId(request.getProductId()); order.setQuantity(request.getQuantity()); order.setAmount(request.getAmount()); order.setStatus(OrderStatus.PENDING); return orderRepository.save(order); } } // 全局事务配置 Configuration public class GlobalTransactionConfig { Value(${spring.cloud.alibaba.seata.tx-session-group}) private String txSessionGroup; Bean public GlobalTransactionScanner globalTransactionScanner() { return new GlobalTransactionScanner( order-service, // 应用名称 txSessionGroup, new VueConfigurationParser() // 自定义参数解析 ); } }2.4 Seata TCC模式// TCC接口定义 public interface InventoryTccService { TwoPhaseBusinessAction( name reduceStock, commitMethod commit, rollbackMethod rollback ) BusinessActionResponse prepare( BusinessActionContext actionContext, BusinessActionContextParameter(paramName productId) Long productId, BusinessActionContextParameter(paramName quantity) Integer quantity ); boolean commit(BusinessActionContext actionContext); boolean rollback(BusinessActionContext actionContext); } Service Slf4j public class InventoryTccServiceImpl implements InventoryTccService { Autowired private InventoryRepository inventoryRepository; Override Transactional public BusinessActionResponse prepare( BusinessActionContext actionContext, Long productId, Integer quantity) { log.info(TCC Prepare: productId{}, quantity{}, productId, quantity); Inventory inventory inventoryRepository .findByProductId(productId) .orElseThrow(() - new BusinessException(Inventory not found)); if (inventory.getStock() quantity) { throw new BusinessException(Insufficient stock); } // 预扣减库存 inventory.setStock(inventory.getStock() - quantity); inventory.setFrozenStock( inventory.getFrozenStock() quantity); inventoryRepository.save(inventory); // 保存分支事务ID String xid actionContext.getXid(); String branchId actionContext.getBranchId(); return BusinessActionResponse.builder() .xid(xid) .branchId(branchId) .build(); } Override Transactional public boolean commit(BusinessActionContext actionContext) { log.info(TCC Commit: {}, actionContext); Long productId (Long) actionContext .getActionContext(productId); Integer quantity (Integer) actionContext .getActionContext(quantity); Inventory inventory inventoryRepository .findByProductId(productId) .orElseThrow(() - new BusinessException(Inventory not found)); // 释放冻结库存从冻结转为已售 inventory.setFrozenStock( inventory.getFrozenStock() - quantity); inventory.setSoldStock( inventory.getSoldStock() quantity); inventoryRepository.save(inventory); return true; } Override Transactional public boolean rollback(BusinessActionContext actionContext) { log.info(TCC Rollback: {}, actionContext); Long productId (Long) actionContext .getActionContext(productId); Integer quantity (Integer) actionContext .getActionContext(quantity); Inventory inventory inventoryRepository .findByProductId(productId) .orElseThrow(() - new BusinessException(Inventory not found)); // 释放冻结库存 inventory.setStock(inventory.getStock() quantity); inventory.setFrozenStock( inventory.getFrozenStock() - quantity); inventoryRepository.save(inventory); return true; } }三、Saga模式3.1 Saga编排器// Saga状态机定义 Configuration public class OrderSagaConfig { Bean public StateMachineEngineString, String orderSagaStateMachineEngine( StateMachineConfigurationFactoryString, String config) { StateMachineFactoryString, String factory new StateMachineFactory(order-saga); return new DefaultStateMachineService(factory, config) .getStateMachineEngine(simple); } } // Saga补偿服务 Service Slf4j public class SagaCompensationService { public void executeCompensation(ListCompensationStep steps) { for (int i steps.size() - 1; i 0; i--) { CompensationStep step steps.get(i); try { executeCompensationStep(step); } catch (Exception e) { log.error(Compensation failed for step: {}, step.getStepName(), e); // 记录补偿失败启动人工干预 recordCompensationFailure(step, e); } } } private void executeCompensationStep(CompensationStep step) { switch (step.getStepType()) { case RESERVE_STOCK: inventoryService.releaseStock( step.getProductId(), step.getQuantity() ); break; case PROCESS_PAYMENT: paymentService.refund( step.getPaymentId(), step.getAmount() ); break; case CREATE_ORDER: orderService.cancelOrder(step.getOrderId()); break; } } }四、消息事务4.1 RocketMQ事务消息Service RequiredArgsConstructor public class OrderTransactionService { Autowired private RocketMQTemplate rocketMQTemplate; Transactional public void createOrderWithTransaction(Order order) { // 1. 本地事务创建订单 Order savedOrder orderRepository.save(order); // 2. 发送半消息 OrderCreatedEvent event OrderCreatedEvent.builder() .orderId(savedOrder.getId()) .customerId(savedOrder.getCustomerId()) .amount(savedOrder.getAmount()) .items(savedOrder.getItems()) .build(); // 设置事务监听器 Transaction transaction rocketMQTemplate .sendMessageInTransaction( order-topic, MessageBuilder .withPayload(event) .setHeader(orderId, savedOrder.getId()) .build(), order // 本地事务参数 ); log.info(Transaction started: {}, transaction); } } // RocketMQ事务监听器 Component Slf4j public class OrderTransactionListener implements RocketMQLocalTransactionListener { Autowired private OrderRepository orderRepository; Autowired private InventoryClient inventoryClient; Override public RocketMQLocalState executeLocalTransaction( Message msg, Object arg) { log.info(Executing local transaction); try { // 执行业务操作 Order order (Order) arg; // 调用远程服务 boolean success inventoryClient.reserveStock( order.getProductId(), order.getQuantity() ); if (success) { return RocketMQLocalState.COMMIT; } else { return RocketMQLocalState.ROLLBACK; } } catch (Exception e) { log.error(Local transaction failed, e); return RocketMQLocalState.UNKNOWN; } } Override public RocketMQLocalState checkLocalTransaction(Message msg) { log.info(Checking local transaction); String orderId msg.getHeaders() .get(orderId, String.class); Order order orderRepository.findById(Long.valueOf(orderId)) .orElse(null); if (order ! null order.getStatus() ! null) { return RocketMQLocalState.COMMIT; } return RocketMQLocalState.ROLLBACK; } }五、最终一致性方案5.1 本地消息表Service RequiredArgsConstructor public class TransactionalMessageService { Autowired private TransactionalMessageRepository messageRepository; Transactional public void saveAndSendMessage(TransactionalMessage message) { // 1. 保存本地消息 message.setStatus(MessageStatus.PENDING); message.setCreateTime(LocalDateTime.now()); messageRepository.save(message); // 2. 发送消息 try { sendToMessageQueue(message); message.setStatus(MessageStatus.SENT); message.setSendTime(LocalDateTime.now()); } catch (Exception e) { message.setStatus(MessageStatus.FAILED); message.setErrorMessage(e.getMessage()); } messageRepository.save(message); } Scheduled(fixedDelay 5000) public void retryUnsentMessages() { ListTransactionalMessage pendingMessages messageRepository.findByStatus(MessageStatus.PENDING); for (TransactionalMessage message : pendingMessages) { try { sendToMessageQueue(message); message.setStatus(MessageStatus.SENT); message.setSendTime(LocalDateTime.now()); message.setRetryCount(message.getRetryCount() 1); } catch (Exception e) { message.setRetryCount(message.getRetryCount() 1); message.setErrorMessage(e.getMessage()); } messageRepository.save(message); } } }六、最佳实践6.1 事务隔离级别# Seata事务隔离级别 seata: lock: mode: local # 或 remote retry-interval: 10 retry-times: 30 retry-policy-check-undo: true6.2 性能优化// 批量处理优化 Service public class BatchOrderService { GlobalTransactional(name batch-create-order, rollbackFor Exception.class) public void batchCreateOrders(ListOrder orders) { // 分批处理减少全局锁竞争 int batchSize 100; ListListOrder batches Lists.partition(orders, batchSize); for (ListOrder batch : batches) { for (Order order : batch) { orderRepository.save(order); } } } }七、总结分布式事务是微服务架构中的核心挑战。通过合理选择事务模式Seata AT/TCC、Saga、消息事务结合业务场景进行权衡可以构建可靠的事务处理方案。关键是要理解各方案的特点和适用场景避免过度设计。