Gerrad Zhang

分布式事务模式与实现方案深度解析

深入探讨分布式事务的核心理论、实现模式和解决方案,结合实际项目经验分享分布式系统中事务一致性保证的最佳实践。

Gerrad Zhang
武汉,中国
2 min read

🤔 问题背景与技术演进

我们要解决什么问题?

在分布式系统中,事务处理面临前所未有的挑战:

  • 跨服务事务:一个业务操作涉及多个微服务,需要保证全部成功或全部失败
  • 数据一致性:分布式环境下如何保证数据的ACID特性
  • 网络不可靠:网络延迟、分区、故障导致的不确定性
  • 性能与一致性权衡:强一致性往往意味着性能损失
  • 复杂性管理:分布式事务的实现和维护复杂度高

没有分布式事务解决方案时是怎么做的?

早期单体应用通常依赖数据库事务:

// 传统单体应用:数据库事务保证一致性
@Transactional
public void transferMoney(Long fromAccount, Long toAccount, BigDecimal amount) {
    // 在同一个数据库事务中执行
    Account from = accountRepository.findById(fromAccount);
    Account to = accountRepository.findById(toAccount);
    
    if (from.getBalance().compareTo(amount) < 0) {
        throw new InsufficientBalanceException();
    }
    
    from.setBalance(from.getBalance().subtract(amount));
    to.setBalance(to.getBalance().add(amount));
    
    accountRepository.save(from);
    accountRepository.save(to);
    
    // 记录转账日志
    TransferLog log = new TransferLog(fromAccount, toAccount, amount);
    transferLogRepository.save(log);
}

传统方案的问题

  • 无法跨越服务边界
  • 数据库成为单点瓶颈
  • 难以水平扩展
  • 服务间耦合度高

技术演进的历史脉络

分布式事务解决方案的发展历程:

  1. 两阶段提交时代(1980-1990):2PC协议,强一致性但性能差
  2. 三阶段提交改进(1990-2000):3PC解决部分2PC问题
  3. 补偿模式兴起(2000-2010):Saga模式,最终一致性
  4. 微服务架构(2010-2015):TCC、消息事务等模式
  5. 云原生时代(2015-2020):事件驱动、CQRS+Event Sourcing
  6. 现代化方案(2020至今):分布式事务中间件、Serverless事务

🎯 核心概念与原理

分布式事务的ACID挑战

在分布式环境下,传统ACID特性面临挑战:

public class DistributedACID {
    
    /**
     * 原子性 (Atomicity) - 分布式环境下的挑战
     */
    public void atomicityChallenge() {
        // 跨服务的操作如何保证全部成功或全部失败?
        orderService.createOrder(order);        // 服务A
        inventoryService.deductStock(product);   // 服务B  
        paymentService.processPayment(payment);  // 服务C
        // 如果服务C失败,如何回滚服务A和B的操作?
    }
    
    /**
     * 一致性 (Consistency) - 数据一致性保证
     */
    public void consistencyChallenge() {
        // 不同服务的数据如何保持一致?
        // 强一致性 vs 最终一致性的权衡
    }
    
    /**
     * 隔离性 (Isolation) - 并发控制
     */
    public void isolationChallenge() {
        // 分布式环境下的锁机制
        // 跨服务的事务隔离级别
    }
    
    /**
     * 持久性 (Durability) - 数据持久化
     */
    public void durabilityChallenge() {
        // 消息丢失、网络分区对持久性的影响
    }
}

CAP定理与BASE理论

CAP定理:分布式系统只能同时满足以下三个特性中的两个

  • Consistency(一致性):所有节点同时看到相同数据
  • Availability(可用性):系统持续可用
  • Partition tolerance(分区容错性):网络分区时系统继续工作

BASE理论:CAP的实践应用

  • Basically Available(基本可用):允许损失部分可用性
  • Soft state(软状态):允许系统中的数据存在中间状态
  • Eventually consistent(最终一致性):系统中的所有数据副本最终能够达到一致的状态

🔧 实现原理与源码分析

两阶段提交(2PC)实现

2PC是最经典的分布式事务协议:

@Component
public class TwoPhaseCommitCoordinator {
    
    private final List<TransactionParticipant> participants;
    private final TransactionLogger transactionLogger;
    
    /**
     * 两阶段提交实现
     */
    public boolean executeTransaction(String transactionId, List<TransactionOperation> operations) {
        // 阶段1:准备阶段 (Prepare Phase)
        boolean allPrepared = preparePhase(transactionId, operations);
        
        if (allPrepared) {
            // 阶段2:提交阶段 (Commit Phase)
            return commitPhase(transactionId);
        } else {
            // 阶段2:回滚阶段 (Abort Phase)
            return abortPhase(transactionId);
        }
    }
    
    /**
     * 准备阶段:询问所有参与者是否可以提交
     */
    private boolean preparePhase(String transactionId, List<TransactionOperation> operations) {
        transactionLogger.logTransactionStart(transactionId);
        
        for (int i = 0; i < participants.size(); i++) {
            TransactionParticipant participant = participants.get(i);
            TransactionOperation operation = operations.get(i);
            
            try {
                // 发送准备请求
                PrepareResult result = participant.prepare(transactionId, operation);
                transactionLogger.logPrepareResult(transactionId, participant.getId(), result);
                
                if (result != PrepareResult.YES) {
                    // 有参与者无法提交,记录并返回失败
                    transactionLogger.logTransactionAbort(transactionId, "Participant " + participant.getId() + " voted NO");
                    return false;
                }
            } catch (Exception e) {
                // 网络异常或参与者故障
                transactionLogger.logTransactionAbort(transactionId, "Participant " + participant.getId() + " failed: " + e.getMessage());
                return false;
            }
        }
        
        transactionLogger.logAllPrepared(transactionId);
        return true;
    }
    
    /**
     * 提交阶段:通知所有参与者提交事务
     */
    private boolean commitPhase(String transactionId) {
        boolean allCommitted = true;
        
        for (TransactionParticipant participant : participants) {
            try {
                participant.commit(transactionId);
                transactionLogger.logCommitResult(transactionId, participant.getId(), true);
            } catch (Exception e) {
                // 提交阶段失败,记录但继续(因为已经决定提交)
                transactionLogger.logCommitResult(transactionId, participant.getId(), false);
                allCommitted = false;
            }
        }
        
        transactionLogger.logTransactionCommit(transactionId, allCommitted);
        return allCommitted;
    }
    
    /**
     * 回滚阶段:通知所有参与者回滚事务
     */
    private boolean abortPhase(String transactionId) {
        for (TransactionParticipant participant : participants) {
            try {
                participant.abort(transactionId);
                transactionLogger.logAbortResult(transactionId, participant.getId(), true);
            } catch (Exception e) {
                transactionLogger.logAbortResult(transactionId, participant.getId(), false);
            }
        }
        
        transactionLogger.logTransactionAbort(transactionId, "Coordinator decision");
        return true;
    }
}

/**
 * 事务参与者接口
 */
public interface TransactionParticipant {
    String getId();
    PrepareResult prepare(String transactionId, TransactionOperation operation);
    void commit(String transactionId);
    void abort(String transactionId);
}

/**
 * 事务参与者实现示例
 */
@Service
public class OrderServiceParticipant implements TransactionParticipant {
    
    @Autowired
    private OrderRepository orderRepository;
    
    private final Map<String, Order> preparedOrders = new ConcurrentHashMap<>();
    
    @Override
    public PrepareResult prepare(String transactionId, TransactionOperation operation) {
        try {
            CreateOrderOperation orderOp = (CreateOrderOperation) operation;
            
            // 检查业务规则
            if (!validateOrder(orderOp.getOrder())) {
                return PrepareResult.NO;
            }
            
            // 预留资源(但不提交)
            Order order = orderOp.getOrder();
            preparedOrders.put(transactionId, order);
            
            return PrepareResult.YES;
        } catch (Exception e) {
            return PrepareResult.NO;
        }
    }
    
    @Override
    public void commit(String transactionId) {
        Order order = preparedOrders.remove(transactionId);
        if (order != null) {
            orderRepository.save(order);
        }
    }
    
    @Override
    public void abort(String transactionId) {
        preparedOrders.remove(transactionId);
    }
}

TCC(Try-Confirm-Cancel)模式

TCC是一种补偿型事务模式:

@Service
public class TCCTransactionManager {
    
    @Autowired
    private List<TCCParticipant> participants;
    
    @Autowired
    private TransactionRepository transactionRepository;
    
    /**
     * TCC事务执行
     */
    public TCCResult executeTransaction(String businessId, TCCContext context) {
        String transactionId = generateTransactionId();
        
        // 创建事务记录
        TCCTransaction transaction = new TCCTransaction(transactionId, businessId, TCCStatus.TRYING);
        transactionRepository.save(transaction);
        
        try {
            // Try阶段:尝试执行业务操作
            boolean trySuccess = tryPhase(transactionId, context);
            
            if (trySuccess) {
                // Confirm阶段:确认执行
                transaction.setStatus(TCCStatus.CONFIRMING);
                transactionRepository.save(transaction);
                
                boolean confirmSuccess = confirmPhase(transactionId, context);
                transaction.setStatus(confirmSuccess ? TCCStatus.CONFIRMED : TCCStatus.CONFIRM_FAILED);
                transactionRepository.save(transaction);
                
                return new TCCResult(confirmSuccess, transactionId);
            } else {
                // Cancel阶段:取消执行
                transaction.setStatus(TCCStatus.CANCELLING);
                transactionRepository.save(transaction);
                
                cancelPhase(transactionId, context);
                transaction.setStatus(TCCStatus.CANCELLED);
                transactionRepository.save(transaction);
                
                return new TCCResult(false, transactionId);
            }
        } catch (Exception e) {
            // 异常情况下取消事务
            cancelPhase(transactionId, context);
            transaction.setStatus(TCCStatus.CANCELLED);
            transactionRepository.save(transaction);
            
            return new TCCResult(false, transactionId);
        }
    }
    
    /**
     * Try阶段:预留资源
     */
    private boolean tryPhase(String transactionId, TCCContext context) {
        for (TCCParticipant participant : participants) {
            try {
                boolean result = participant.tryExecute(transactionId, context);
                if (!result) {
                    // 如果有参与者Try失败,需要Cancel已经Try成功的参与者
                    cancelPreviousParticipants(transactionId, context, participant);
                    return false;
                }
            } catch (Exception e) {
                cancelPreviousParticipants(transactionId, context, participant);
                return false;
            }
        }
        return true;
    }
    
    /**
     * Confirm阶段:确认执行
     */
    private boolean confirmPhase(String transactionId, TCCContext context) {
        boolean allConfirmed = true;
        
        for (TCCParticipant participant : participants) {
            try {
                participant.confirmExecute(transactionId, context);
            } catch (Exception e) {
                allConfirmed = false;
                // Confirm失败需要记录,但不能Cancel(因为可能已经有副作用)
                log.error("Confirm failed for participant: " + participant.getClass().getSimpleName(), e);
            }
        }
        
        return allConfirmed;
    }
    
    /**
     * Cancel阶段:取消执行
     */
    private void cancelPhase(String transactionId, TCCContext context) {
        for (TCCParticipant participant : participants) {
            try {
                participant.cancelExecute(transactionId, context);
            } catch (Exception e) {
                // Cancel失败需要记录并可能需要人工干预
                log.error("Cancel failed for participant: " + participant.getClass().getSimpleName(), e);
            }
        }
    }
}

/**
 * TCC参与者接口
 */
public interface TCCParticipant {
    boolean tryExecute(String transactionId, TCCContext context);
    void confirmExecute(String transactionId, TCCContext context);
    void cancelExecute(String transactionId, TCCContext context);
}

/**
 * 库存服务TCC实现
 */
@Service
public class InventoryTCCService implements TCCParticipant {
    
    @Autowired
    private InventoryRepository inventoryRepository;
    
    @Autowired
    private InventoryReservationRepository reservationRepository;
    
    @Override
    public boolean tryExecute(String transactionId, TCCContext context) {
        InventoryDeductRequest request = context.getInventoryRequest();
        
        // 检查库存是否充足
        Inventory inventory = inventoryRepository.findByProductId(request.getProductId());
        if (inventory.getAvailableStock() < request.getQuantity()) {
            return false;
        }
        
        // 预留库存(冻结)
        InventoryReservation reservation = new InventoryReservation(
            transactionId, 
            request.getProductId(), 
            request.getQuantity()
        );
        reservationRepository.save(reservation);
        
        // 更新可用库存
        inventory.setAvailableStock(inventory.getAvailableStock() - request.getQuantity());
        inventory.setReservedStock(inventory.getReservedStock() + request.getQuantity());
        inventoryRepository.save(inventory);
        
        return true;
    }
    
    @Override
    public void confirmExecute(String transactionId, TCCContext context) {
        InventoryReservation reservation = reservationRepository.findByTransactionId(transactionId);
        if (reservation != null) {
            // 确认扣减:将预留库存转为已扣减
            Inventory inventory = inventoryRepository.findByProductId(reservation.getProductId());
            inventory.setReservedStock(inventory.getReservedStock() - reservation.getQuantity());
            inventory.setTotalStock(inventory.getTotalStock() - reservation.getQuantity());
            inventoryRepository.save(inventory);
            
            // 删除预留记录
            reservationRepository.delete(reservation);
        }
    }
    
    @Override
    public void cancelExecute(String transactionId, TCCContext context) {
        InventoryReservation reservation = reservationRepository.findByTransactionId(transactionId);
        if (reservation != null) {
            // 取消扣减:恢复可用库存
            Inventory inventory = inventoryRepository.findByProductId(reservation.getProductId());
            inventory.setAvailableStock(inventory.getAvailableStock() + reservation.getQuantity());
            inventory.setReservedStock(inventory.getReservedStock() - reservation.getQuantity());
            inventoryRepository.save(inventory);
            
            // 删除预留记录
            reservationRepository.delete(reservation);
        }
    }
}

Saga模式实现

Saga是一种长事务处理模式:

@Component
public class SagaOrchestrator {
    
    @Autowired
    private SagaDefinitionRegistry sagaRegistry;
    
    @Autowired
    private SagaInstanceRepository sagaInstanceRepository;
    
    @Autowired
    private MessagePublisher messagePublisher;
    
    /**
     * 启动Saga事务
     */
    public String startSaga(String sagaType, Object sagaData) {
        SagaDefinition definition = sagaRegistry.getDefinition(sagaType);
        SagaInstance instance = new SagaInstance(UUID.randomUUID().toString(), sagaType, sagaData);
        
        sagaInstanceRepository.save(instance);
        
        // 执行第一个步骤
        executeNextStep(instance, definition);
        
        return instance.getId();
    }
    
    /**
     * 执行下一个步骤
     */
    private void executeNextStep(SagaInstance instance, SagaDefinition definition) {
        if (instance.getCurrentStep() < definition.getSteps().size()) {
            SagaStep step = definition.getSteps().get(instance.getCurrentStep());
            
            try {
                // 发送命令消息
                SagaCommand command = step.buildCommand(instance.getData());
                messagePublisher.publish(step.getCommandTopic(), command);
                
                // 更新实例状态
                instance.setStatus(SagaStatus.EXECUTING);
                instance.addExecutedStep(step.getName());
                sagaInstanceRepository.save(instance);
                
            } catch (Exception e) {
                // 执行失败,开始补偿
                startCompensation(instance, definition);
            }
        } else {
            // 所有步骤执行完成
            instance.setStatus(SagaStatus.COMPLETED);
            sagaInstanceRepository.save(instance);
        }
    }
    
    /**
     * 处理步骤执行结果
     */
    @EventListener
    public void handleStepResult(SagaStepResultEvent event) {
        SagaInstance instance = sagaInstanceRepository.findById(event.getSagaId());
        SagaDefinition definition = sagaRegistry.getDefinition(instance.getType());
        
        if (event.isSuccess()) {
            // 步骤执行成功,继续下一步
            instance.setCurrentStep(instance.getCurrentStep() + 1);
            sagaInstanceRepository.save(instance);
            executeNextStep(instance, definition);
        } else {
            // 步骤执行失败,开始补偿
            startCompensation(instance, definition);
        }
    }
    
    /**
     * 开始补偿流程
     */
    private void startCompensation(SagaInstance instance, SagaDefinition definition) {
        instance.setStatus(SagaStatus.COMPENSATING);
        sagaInstanceRepository.save(instance);
        
        // 逆序执行补偿操作
        List<String> executedSteps = instance.getExecutedSteps();
        for (int i = executedSteps.size() - 1; i >= 0; i--) {
            String stepName = executedSteps.get(i);
            SagaStep step = definition.getStepByName(stepName);
            
            try {
                // 发送补偿命令
                SagaCompensationCommand compensationCommand = step.buildCompensationCommand(instance.getData());
                messagePublisher.publish(step.getCompensationTopic(), compensationCommand);
            } catch (Exception e) {
                // 补偿失败,记录错误
                log.error("Compensation failed for step: " + stepName, e);
            }
        }
        
        instance.setStatus(SagaStatus.COMPENSATED);
        sagaInstanceRepository.save(instance);
    }
}

/**
 * Saga定义
 */
public class OrderSagaDefinition extends SagaDefinition {
    
    public OrderSagaDefinition() {
        super("ORDER_SAGA");
        
        // 定义Saga步骤
        addStep(new SagaStep("CREATE_ORDER")
            .commandTopic("order.create")
            .compensationTopic("order.cancel")
            .commandBuilder(this::buildCreateOrderCommand)
            .compensationBuilder(this::buildCancelOrderCommand));
            
        addStep(new SagaStep("DEDUCT_INVENTORY")
            .commandTopic("inventory.deduct")
            .compensationTopic("inventory.restore")
            .commandBuilder(this::buildDeductInventoryCommand)
            .compensationBuilder(this::buildRestoreInventoryCommand));
            
        addStep(new SagaStep("PROCESS_PAYMENT")
            .commandTopic("payment.process")
            .compensationTopic("payment.refund")
            .commandBuilder(this::buildProcessPaymentCommand)
            .compensationBuilder(this::buildRefundPaymentCommand));
    }
    
    private CreateOrderCommand buildCreateOrderCommand(Object sagaData) {
        OrderSagaData data = (OrderSagaData) sagaData;
        return new CreateOrderCommand(data.getOrderId(), data.getCustomerId(), data.getItems());
    }
    
    private CancelOrderCommand buildCancelOrderCommand(Object sagaData) {
        OrderSagaData data = (OrderSagaData) sagaData;
        return new CancelOrderCommand(data.getOrderId());
    }
}

💡 实战案例与代码示例

电商订单分布式事务

电商下单是分布式事务的典型场景:

@Service
public class OrderTransactionService {
    
    @Autowired
    private TCCTransactionManager tccManager;
    
    @Autowired
    private SagaOrchestrator sagaOrchestrator;
    
    @Autowired
    private MessageTransactionManager messageTransactionManager;
    
    /**
     * 使用TCC模式处理订单
     */
    public OrderResult createOrderWithTCC(CreateOrderRequest request) {
        TCCContext context = new TCCContext();
        context.setOrderRequest(request);
        context.setInventoryRequest(new InventoryDeductRequest(request.getProductId(), request.getQuantity()));
        context.setPaymentRequest(new PaymentRequest(request.getCustomerId(), request.getAmount()));
        
        TCCResult result = tccManager.executeTransaction(request.getOrderId(), context);
        
        return new OrderResult(result.isSuccess(), result.getTransactionId());
    }
    
    /**
     * 使用Saga模式处理订单
     */
    public String createOrderWithSaga(CreateOrderRequest request) {
        OrderSagaData sagaData = new OrderSagaData(request);
        return sagaOrchestrator.startSaga("ORDER_SAGA", sagaData);
    }
    
    /**
     * 使用消息事务处理订单
     */
    public void createOrderWithMessage(CreateOrderRequest request) {
        messageTransactionManager.executeInTransaction(() -> {
            // 1. 创建本地订单
            Order order = createLocalOrder(request);
            
            // 2. 发送消息到其他服务
            OrderCreatedEvent event = new OrderCreatedEvent(order);
            messagePublisher.publish("order.created", event);
            
            return order;
        });
    }
}

/**
 * 消息事务管理器
 */
@Component
public class MessageTransactionManager {
    
    @Autowired
    private DataSource dataSource;
    
    @Autowired
    private MessagePublisher messagePublisher;
    
    /**
     * 本地消息表模式
     */
    @Transactional
    public <T> T executeInTransaction(Supplier<T> businessOperation) {
        // 1. 执行业务操作
        T result = businessOperation.get();
        
        // 2. 将消息保存到本地消息表
        List<OutboxEvent> events = messagePublisher.getPendingEvents();
        for (OutboxEvent event : events) {
            outboxEventRepository.save(event);
        }
        
        return result;
    }
    
    /**
     * 异步发送消息
     */
    @Scheduled(fixedDelay = 5000)
    public void sendPendingMessages() {
        List<OutboxEvent> pendingEvents = outboxEventRepository.findPendingEvents();
        
        for (OutboxEvent event : pendingEvents) {
            try {
                messagePublisher.publishEvent(event);
                event.setStatus(EventStatus.SENT);
                outboxEventRepository.save(event);
            } catch (Exception e) {
                event.incrementRetryCount();
                if (event.getRetryCount() > MAX_RETRY_COUNT) {
                    event.setStatus(EventStatus.FAILED);
                }
                outboxEventRepository.save(event);
            }
        }
    }
}

/**
 * 订单服务消息处理
 */
@Component
public class OrderEventHandler {
    
    @Autowired
    private OrderService orderService;
    
    @Autowired
    private InventoryService inventoryService;
    
    /**
     * 处理库存扣减结果
     */
    @RabbitListener(queues = "inventory.deducted")
    public void handleInventoryDeducted(InventoryDeductedEvent event) {
        if (event.isSuccess()) {
            // 库存扣减成功,继续处理支付
            PaymentRequest paymentRequest = new PaymentRequest(event.getOrderId(), event.getAmount());
            paymentService.processPayment(paymentRequest);
        } else {
            // 库存扣减失败,取消订单
            orderService.cancelOrder(event.getOrderId());
        }
    }
    
    /**
     * 处理支付结果
     */
    @RabbitListener(queues = "payment.processed")
    public void handlePaymentProcessed(PaymentProcessedEvent event) {
        if (event.isSuccess()) {
            // 支付成功,订单完成
            orderService.completeOrder(event.getOrderId());
        } else {
            // 支付失败,恢复库存并取消订单
            inventoryService.restoreStock(event.getOrderId());
            orderService.cancelOrder(event.getOrderId());
        }
    }
}

分布式事务中间件集成

使用Seata等分布式事务中间件:

@Configuration
public class SeataConfig {
    
    @Bean
    public GlobalTransactionScanner globalTransactionScanner() {
        return new GlobalTransactionScanner("order-service", "default");
    }
}

@Service
public class SeataOrderService {
    
    @Autowired
    private OrderMapper orderMapper;
    
    @Autowired
    private InventoryServiceClient inventoryServiceClient;
    
    @Autowired
    private PaymentServiceClient paymentServiceClient;
    
    /**
     * 使用Seata AT模式
     */
    @GlobalTransactional(rollbackFor = Exception.class)
    public void createOrderWithSeata(CreateOrderRequest request) {
        // 1. 创建订单
        Order order = new Order(request);
        orderMapper.insert(order);
        
        // 2. 扣减库存(远程调用)
        InventoryDeductRequest inventoryRequest = new InventoryDeductRequest(
            request.getProductId(), request.getQuantity());
        inventoryServiceClient.deductInventory(inventoryRequest);
        
        // 3. 处理支付(远程调用)
        PaymentRequest paymentRequest = new PaymentRequest(
            request.getCustomerId(), request.getAmount());
        paymentServiceClient.processPayment(paymentRequest);
        
        // 任何一步失败都会自动回滚
    }
    
    /**
     * 使用Seata TCC模式
     */
    @GlobalTransactional
    public void createOrderWithSeataTCC(CreateOrderRequest request) {
        // Seata会自动管理TCC的三个阶段
        inventoryTCCService.deductInventory(null, request.getProductId(), request.getQuantity());
        paymentTCCService.processPayment(null, request.getCustomerId(), request.getAmount());
    }
}

/**
 * Seata TCC参与者
 */
@LocalTCC
public interface InventoryTCCService {
    
    @TwoPhaseBusinessAction(name = "deductInventory", commitMethod = "commitDeduct", rollbackMethod = "rollbackDeduct")
    boolean deductInventory(BusinessActionContext context, 
                           @BusinessActionContextParameter("productId") Long productId,
                           @BusinessActionContextParameter("quantity") Integer quantity);
    
    boolean commitDeduct(BusinessActionContext context);
    
    boolean rollbackDeduct(BusinessActionContext context);
}

@Service
public class InventoryTCCServiceImpl implements InventoryTCCService {
    
    @Override
    public boolean deductInventory(BusinessActionContext context, Long productId, Integer quantity) {
        // Try阶段:预留库存
        return inventoryService.reserveInventory(productId, quantity);
    }
    
    @Override
    public boolean commitDeduct(BusinessActionContext context) {
        // Confirm阶段:确认扣减
        Long productId = (Long) context.getActionContext("productId");
        Integer quantity = (Integer) context.getActionContext("quantity");
        return inventoryService.confirmDeduct(productId, quantity);
    }
    
    @Override
    public boolean rollbackDeduct(BusinessActionContext context) {
        // Cancel阶段:释放预留
        Long productId = (Long) context.getActionContext("productId");
        Integer quantity = (Integer) context.getActionContext("quantity");
        return inventoryService.releaseReservation(productId, quantity);
    }
}

🎯 面试高频问题精讲

1. 分布式事务有哪些解决方案?各有什么优缺点?

标准答案

  • 2PC/3PC:强一致性,但性能差、阻塞严重
  • TCC:性能好,但业务侵入性强
  • Saga:适合长事务,但只能保证最终一致性
  • 消息事务:性能好,但实现复杂
  • AT模式:业务无侵入,但依赖中间件

扩展要点

// 方案对比
2PC: 强一致性,同步阻塞,单点故障
TCC: 最终一致性,业务侵入,实现复杂
Saga: 最终一致性,补偿复杂,适合长事务
消息事务: 最终一致性,异步处理,消息可靠性

2. CAP定理在分布式事务中如何应用?

标准答案

  • CP系统:选择一致性和分区容错性,如2PC
  • AP系统:选择可用性和分区容错性,如Saga
  • CA系统:只在单机或无网络分区时可能

面试技巧:结合具体业务场景说明选择原因。

3. 什么是幂等性?如何保证分布式事务的幂等性?

标准答案: 幂等性是指多次执行同一操作的结果相同。保证方法:

  • 唯一性约束:数据库层面防重
  • 状态机:基于状态转换
  • 分布式锁:防止并发重复执行
  • 消息去重:基于消息ID去重

4. Saga模式的补偿策略有哪些?

标准答案

  • 向后恢复:执行补偿操作撤销已完成的步骤
  • 向前恢复:重试失败的步骤直到成功
  • 混合策略:根据步骤特性选择不同策略

5. 分布式事务中如何处理网络分区?

标准答案

  • 超时机制:设置合理的超时时间
  • 重试策略:指数退避重试
  • 熔断器:防止级联故障
  • 降级方案:提供备用处理逻辑

6. 本地消息表模式的实现原理?

标准答案

  1. 业务操作和消息写入在同一个本地事务中
  2. 异步任务扫描消息表发送消息
  3. 消息发送成功后更新状态
  4. 支持重试和失败处理

7. 如何选择分布式事务方案?

标准答案: 考虑因素:

  • 一致性要求:强一致性选2PC,最终一致性选Saga
  • 性能要求:高性能选TCC或消息事务
  • 业务复杂度:简单业务选AT模式
  • 技术栈:考虑现有技术栈的兼容性

8. 分布式事务的监控和运维?

标准答案

  • 事务状态监控:跟踪事务执行状态
  • 性能监控:监控事务执行时间和成功率
  • 异常告警:事务失败、超时告警
  • 数据一致性检查:定期检查数据一致性

⚡ 性能优化与注意事项

分布式事务性能优化

@Component
public class TransactionOptimizer {
    
    /**
     * 异步化优化
     */
    public void asyncOptimization() {
        // 将非关键步骤异步化
        CompletableFuture.runAsync(() -> {
            notificationService.sendOrderNotification(order);
        });
        
        // 使用消息队列解耦
        messagePublisher.publish("order.analytics", new OrderAnalyticsEvent(order));
    }
    
    /**
     * 批量处理优化
     */
    @Scheduled(fixedDelay = 1000)
    public void batchProcessing() {
        List<PendingTransaction> transactions = getPendingTransactions();
        if (!transactions.isEmpty()) {
            // 批量提交事务
            batchCommitTransactions(transactions);
        }
    }
    
    /**
     * 超时控制
     */
    @GlobalTransactional(timeoutMills = 30000)
    public void transactionWithTimeout() {
        // 设置合理的事务超时时间
    }
}

常见问题和解决方案

  1. 长事务问题:拆分为多个短事务
  2. 死锁问题:统一资源访问顺序
  3. 数据倾斜:合理的分片策略
  4. 补偿失败:人工干预机制

监控和告警

@Component
public class TransactionMonitor {
    
    @EventListener
    public void handleTransactionEvent(TransactionEvent event) {
        // 记录事务指标
        transactionMetrics.record(event);
        
        // 检查异常情况
        if (event.getDuration() > SLOW_TRANSACTION_THRESHOLD) {
            alertService.sendSlowTransactionAlert(event);
        }
    }
}

📚 总结与技术对比

核心要点回顾

分布式事务是分布式系统的核心挑战:

  1. 理解权衡:在一致性、可用性、性能间找平衡
  2. 选择合适方案:根据业务特点选择事务模式
  3. 关注运维:建立完善的监控和异常处理机制
  4. 持续优化:基于业务发展调整事务策略

方案对比总结

方案一致性性能复杂度适用场景
2PC强一致强一致性要求
TCC最终一致高性能要求
Saga最终一致长事务处理
消息事务最终一致异步处理
AT模式强一致业务无侵入

持续学习建议

  1. 深入理解分布式理论:CAP、BASE、ACID等
  2. 实践不同事务模式:在项目中尝试各种方案
  3. 关注中间件发展:Seata、DTM等新技术
  4. 学习相关技术:消息队列、分布式锁等

分布式事务是一个复杂但重要的技术领域,需要结合具体业务场景,在一致性、性能和复杂度之间找到最佳平衡点。

Comments

Link copied to clipboard!