外部系统回调的异步处理架构:接收、落库、MQ消费、推送的完整设计
外部系统回调的异步处理架构接收、落库、MQ消费、推送的完整设计一、架构概述在微服务系统中经常需要接收外部系统的回调通知如物流节点推送、支付结果回调、第三方状态变更然后将处理结果同步给其他下游系统。这类场景的典型架构是外部系统回调 │ ▼ ① API 接口接收快速响应不做重逻辑 │ ▼ ② 落库保存原始数据日志表用于追溯和重试 │ ▼ ③ 发 MQ 消息异步解耦 │ ▼ ④ MQ 消费者处理业务逻辑核心处理可重试 │ ▼ ⑤ 事务提交后通知下游系统再发 MQ 或调接口二、为什么要这样设计2.1 为什么不在 API 接口中直接处理业务直接处理异步处理外部系统等待时间长快速返回成功外部系统不超时处理失败时外部系统会重推失败后内部重试不依赖外部重推无法控制并发MQ 天然限流可控制消费速度一次失败全部丢失数据已落库随时可重试2.2 为什么要先落库再发 MQ场景如果不落库直接发MQ 外部回调 → 发MQ → MQ丢失网络抖动→ 数据永久丢失 ↑ 无法恢复 场景先落库再发MQ 外部回调 → 落库数据安全了→ 发MQ → MQ丢失 ↑ 没关系定时任务扫描日志表重发2.3 为什么消费者要用新事务MQ 消费者通常使用Transactional(propagation REQUIRES_NEW)消费失败时事务回滚MQ 可以重新投递不影响其他消息的消费每条消息独立事务互不干扰注博客https://blog.csdn.net/badao_liumang_qizhi三、完整示例支付结果回调处理3.1 场景描述支付平台在用户支付成功后回调你的系统通知支付结果。你需要接收回调快速返回更新订单状态为已支付通知仓库系统发货通知积分系统加积分3.2 整体架构图支付平台 │ ▼ PaymentCallbackControllerAPI层 ├── 保存回调日志到 payment_callback_log 表 ├── 返回接收成功给支付平台 └── 发 MQ 消息logId │ ▼ PaymentCallbackConsumerMQ消费者 ├── 根据 logId 查询日志表获取回调数据 ├── 更新订单状态数据库事务 ├── 注册事务提交后的操作 │ ├── 发 MQ 通知仓库发货 │ └── 发 MQ 通知积分系统 └── 更新日志表状态为处理成功3.3 代码实现第一层API 接口快速接收/** * 支付回调接口. * 职责接收数据、落库、发MQ快速返回. */RestControllerRequestMapping(/api/callback)publicclassPaymentCallbackController{ResourceprivatePaymentCallbackLogRepositorycallbackLogRepository;ResourceprivatePaymentCallbackMqSendercallbackMqSender;/** * 接收支付平台回调. */PostMapping(/payment-notify)publicCallbackResponsereceivePaymentNotify(RequestBodyPaymentNotifyDtonotifyDto){try{// 1. 保存原始回调数据到日志表PaymentCallbackLogcallbackLognewPaymentCallbackLog();callbackLog.setOrderCode(notifyDto.getOrderCode());callbackLog.setPaymentNo(notifyDto.getPaymentNo());callbackLog.setInputParams(JSON.toJSONString(notifyDto));callbackLog.setStatus(PENDING);// 待处理callbackLog.setReceiveTime(newDate());callbackLogRepository.save(callbackLog);// 2. 发 MQ 异步处理callbackMqSender.send(callbackLog.getId());// 3. 快速返回成功支付平台不会重推returnCallbackResponse.success();}catch(Exceptione){log.error(接收支付回调异常, orderCode:{},notifyDto.getOrderCode(),e);// 返回失败支付平台会重推returnCallbackResponse.fail(处理异常);}}}第二层MQ 发送者/** * 支付回调MQ发送者. */ComponentpublicclassPaymentCallbackMqSender{ResourceprivateRabbitTemplaterabbitTemplate;/** * 发送回调处理消息. */publicvoidsend(IntegerlogId){rabbitTemplate.convertAndSend(payment-callback-exchange,payment.callback.process,logId);log.info(支付回调MQ发送, logId:{},logId);}}第三层MQ 消费者核心业务处理/** * 支付回调MQ消费者. * 职责处理核心业务逻辑更新订单状态通知下游. */ComponentpublicclassPaymentCallbackConsumer{ResourceprivatePaymentCallbackServicepaymentCallbackService;/** * 消费支付回调消息. */RabbitListener(queuespayment-callback-queue)publicvoidconsume(IntegerlogId){log.info(开始处理支付回调, logId:{},logId);try{paymentCallbackService.processCallback(logId);}catch(Exceptione){log.error(处理支付回调异常, logId:{},logId,e);throwe;// 抛异常让MQ重试}}}/** * 支付回调处理服务. */ServicepublicclassPaymentCallbackServiceImplimplementsPaymentCallbackService{ResourceprivatePaymentCallbackLogRepositorycallbackLogRepository;ResourceprivateOrderRepositoryorderRepository;ResourceprivateWarehouseMqSenderwarehouseMqSender;ResourceprivatePointsMqSenderpointsMqSender;/** * 处理支付回调新事务独立于MQ消费框架的事务. */Transactional(propagationPropagation.REQUIRES_NEW,rollbackForException.class)publicvoidprocessCallback(IntegerlogId){// 1. 查询回调日志PaymentCallbackLogcallbackLogcallbackLogRepository.findById(logId).orElse(null);if(callbackLognull){log.warn(回调日志不存在, logId:{},logId);return;}// 2. 幂等检查已处理的不重复处理if(SUCCESS.equals(callbackLog.getStatus())){log.info(回调已处理过, logId:{},logId);return;}// 3. 解析回调数据PaymentNotifyDtonotifyDtoJSON.parseObject(callbackLog.getInputParams(),PaymentNotifyDto.class);// 4. 更新订单状态OrderorderorderRepository.findByOrderCode(notifyDto.getOrderCode());if(ordernull){callbackLog.setStatus(FAILED);callbackLog.setErrorMsg(订单不存在);callbackLogRepository.save(callbackLog);return;}order.setPaymentStatus(PAID);order.setPaymentNo(notifyDto.getPaymentNo());order.setPaymentTime(notifyDto.getPaymentTime());orderRepository.save(order);// 5. 更新日志状态callbackLog.setStatus(SUCCESS);callbackLogRepository.save(callbackLog);// 6. 注册事务提交后通知下游系统IntegerorderIdorder.getId();AfterTransactionActionCollectorcollectornewAfterTransactionActionCollector();// 通知仓库发货collector.addCommitSyncAction(()-{try{warehouseMqSender.sendShipOrder(orderId);}catch(Exceptione){log.warn(通知仓库发货失败, orderId:{},orderId,e);}});// 通知积分系统加积分collector.addCommitSyncAction(()-{try{pointsMqSender.sendAddPoints(order.getUserId(),order.getAmount());}catch(Exceptione){log.warn(通知积分系统失败, orderId:{},orderId,e);}});TransactionSynchronizationManager.registerSynchronization(collector);log.info(支付回调处理完成, logId:{}, orderCode:{},logId,notifyDto.getOrderCode());}}日志表实体/** * 支付回调日志表. */EntityTable(namepayment_callback_log)publicclassPaymentCallbackLog{privateIntegerid;privateStringorderCode;// 订单号privateStringpaymentNo;// 支付流水号privateStringinputParams;// 原始回调数据JSONprivateStringstatus;// PENDING/SUCCESS/FAILEDprivateStringerrorMsg;// 失败原因privateDatereceiveTime;// 接收时间privateIntegerretryCount;// 重试次数}四、关键技术点4.1 接口幂等性外部系统可能重复回调网络超时重试必须保证多次调用结果一致// API 层幂等相同数据不重复落库PaymentCallbackLogexistingLogcallbackLogRepository.findByPaymentNo(notifyDto.getPaymentNo());if(existingLog!null){returnCallbackResponse.success();// 已接收过直接返回成功}// 消费者层幂等已处理的不重复处理if(SUCCESS.equals(callbackLog.getStatus())){return;// 已处理过跳过}4.2 MQ 消费失败重试// 消费者抛异常 → MQ 框架自动重试RabbitListener(queuespayment-callback-queue)publicvoidconsume(IntegerlogId){try{paymentCallbackService.processCallback(logId);}catch(Exceptione){log.error(处理失败, logId:{},logId,e);throwe;// 抛出异常MQ 会重新投递这条消息}}RabbitMQ 重试策略配置spring:rabbitmq:listener:simple:retry:enabled:truemax-attempts:3# 最多重试3次initial-interval:5000# 首次重试间隔5秒multiplier:2# 每次间隔翻倍4.3 定时任务兜底补偿MQ 可能丢消息需要定时任务扫描卡住的数据/** * 定时任务补偿处理卡住的回调. * 每5分钟扫描一次状态为 PENDING 且超过10分钟未处理的记录. */Scheduled(fixedRate300000)publicvoidcompensatePendingCallbacks(){DatethresholdDateUtils.addMinutes(newDate(),-10);ListPaymentCallbackLogpendingLogscallbackLogRepository.findByStatusAndReceiveTimeBefore(PENDING,threshold);for(PaymentCallbackLogcallbackLog:pendingLogs){if(callbackLog.getRetryCount()5){// 超过最大重试次数标记为失败人工介入callbackLog.setStatus(FAILED);callbackLog.setErrorMsg(超过最大重试次数);callbackLogRepository.save(callbackLog);continue;}// 重新发MQ处理callbackLog.setRetryCount(callbackLog.getRetryCount()1);callbackLogRepository.save(callbackLog);callbackMqSender.send(callbackLog.getId());}}4.4 事务传播行为选择// MQ 消费者使用 REQUIRES_NEW每条消息独立事务Transactional(propagationPropagation.REQUIRES_NEW)publicvoidprocessCallback(IntegerlogId){// 如果处理失败只回滚这条消息的事务// 不影响其他消息的消费}传播行为含义适用场景REQUIRED默认加入当前事务没有则新建普通 Service 方法REQUIRES_NEW总是新建事务挂起当前事务MQ 消费者、独立操作NESTED嵌套事务保存点部分失败不影响外层4.5 日志表的状态机PENDING待处理 │ ├── 处理成功 → SUCCESS │ ├── 处理失败可重试→ PENDINGretryCount1 │ └── 超过最大重试次数 → FAILED人工介入五、异常场景处理5.1 API 接收成功但 MQ 发送失败解决方案先落库定时任务兜底 即使MQ发送失败定时任务会扫描 PENDING 状态的记录重新发送5.2 MQ 消费成功但下游通知失败// 解决方案事务提交后通知失败只记日志collector.addCommitSyncAction(()-{try{warehouseMqSender.sendShipOrder(orderId);}catch(Exceptione){// 不抛异常不影响主流程// 由下游系统的定时任务或人工补偿log.warn(通知仓库失败, orderId:{},orderId,e);}});5.3 消费者处理到一半崩溃消费者开始处理 → 更新订单状态 → JVM崩溃 │ ▼ 事务未提交 → 自动回滚 → 订单状态未变 │ ▼ MQ 消息未 ACK → MQ 重新投递 → 消费者重新处理 │ ▼ 幂等检查 → 订单状态仍是未支付 → 正常处理六、与直接同步调用的对比6.1 同步方式不推荐// 外部回调 → 直接处理所有逻辑 → 返回结果PostMapping(/payment-notify)publicCallbackResponsereceivePaymentNotify(PaymentNotifyDtodto){// 更新订单可能慢orderService.updatePaymentStatus(dto);// 通知仓库可能超时warehouseFeign.shipOrder(dto.getOrderCode());// 通知积分可能失败pointsFeign.addPoints(dto.getUserId(),dto.getAmount());returnCallbackResponse.success();}问题外部系统等待时间长所有操作串行任何一步失败外部系统收到失败响应会重推无法控制并发没有重试机制6.2 异步方式推荐外部回调 → 落库 发MQ → 立即返回成功50ms内 │ ▼ MQ消费者异步处理不影响外部系统 │ ├── 失败 → MQ重试 ├── 崩溃 → MQ重新投递 └── 成功 → 事务后通知下游七、技术栈总结技术在架构中的角色解决的问题REST API接收外部回调提供标准化的接入点JPA/MyBatis数据持久化保存日志、更新业务数据RabbitMQ/Kafka异步解耦接收和处理解耦削峰填谷Transactional事务管理保证数据一致性AfterTransactionActionCollector事务后操作确保数据持久化后再通知下游定时任务补偿机制兜底处理MQ丢失或消费失败幂等设计防重复处理应对外部重推和MQ重试日志表数据追溯记录完整处理链路支持排查和重试八、设计原则总结1. 快速响应API 层只做接收和落库不做重逻辑 2. 数据先行先保存数据再发MQ保证数据不丢 3. 异步解耦通过MQ解耦接收和处理互不阻塞 4. 幂等设计每一层都要能处理重复请求 5. 事务隔离MQ消费者用独立事务失败不影响其他消息 6. 事务后通知下游通知在事务提交后执行保证数据可见性 7. 兜底补偿定时任务扫描异常数据保证最终一致性 8. 可追溯日志表记录完整链路支持问题排查和数据恢复