订单初版—3.支付和履约链路中的技术问题说明文档
大纲1.订单系统的预支付业务流程
2.订单系统的支付回调业务流程
3.预支付和支付回调的并发问题
4.Redisson分布式锁解决预支付和支付并发问题
5.支付成功后推送订单到履约系统的实现
6.推送订单到履约系统时的消息丢失和消费重复问题
7.推送订单到履约系统时面临的问题
8.推送订单到履约系统失败后的MQ重试与死信机制
9.订单支付成功后的履约流程
10.订单履约状态消息的乱序问题分析
11.订单履约状态消息的挂起重试和幂等处理
12.幂等性介绍
1.订单系统的预支付业务流程
(1)预支付接口使用场景
(2)预支付接口具体实现
(1)预支付接口使用场景
⽤户完成订单提交后需要对订单进⾏⽀付。点击"去⽀付"按钮后,系统调⽤此接⼝完成订单与⽀付,接口调用成功后前端会引导⽤户跳转到⽀付界⾯。
进行预支付处理,主要是为了生成一些信息给第三方支付平台的支付界面。这些支付订单的信息包括:支付商品的名称、支付流水号、支付金额等,这些数据需要支付系统提供并传递给第三方支付平台。
(2)预支付接口具体实现
一.预支付订单的处理流程
二.预支付订单前的检查
三.调用支付系统进行预支付
四.更新订单表与支付信息表
步骤一:查询订单信息
步骤二:加⽀付分布式锁
步骤三:校验订单信息
步骤四:组装请求参数
步骤五:调⽤⽀付系统进⾏预⽀付
步骤六:更新(是否拆单)订单表与⽀付信息表
注意:⽤户可能触发多次订单⽀付,也就是可能会多次调⽤预⽀付接⼝。预⽀付订单接⼝会更改订单与⽀付记录,⽀付回调也会判断订单⽀付状态。所以预⽀付与⽀付回调两个接⼝需要基于分布式锁来保证数据并发安全,这里采⽤Redisson框架的分布式锁。
一.预支付订单的处理流程
@Service
public class OrderServiceImpl implements OrderService {
...
//预支付订单
@Override
@Transactional(rollbackFor = Exception.class)
public PrePayOrderDTO prePayOrder(PrePayOrderRequest prePayOrderRequest) {
//1.入参检查
checkPrePayOrderRequestParam(prePayOrderRequest);
String orderId = prePayOrderRequest.getOrderId();
Integer payAmount = prePayOrderRequest.getPayAmount();
//2.加分布式锁(与订单支付回调时加的是同一把锁)
String key = RedisLockKeyConstants.ORDER_PAY_KEY + orderId;
boolean lock = redisLock.lock(key);
if (!lock) {
throw new OrderBizException(OrderErrorCodeEnum.ORDER_PRE_PAY_ERROR);
}
try {
//3.预支付订单前的检查
checkPrePayOrderInfo(orderId, payAmount);
//4.调用支付系统进行预支付
PayOrderRequest payOrderRequest = prePayOrderRequest.clone(PayOrderRequest.class);
JsonResult<PayOrderDTO> jsonResult = payApi.payOrder(payOrderRequest);
if (!jsonResult.getSuccess()) {
throw new OrderBizException(OrderErrorCodeEnum.ORDER_PRE_PAY_ERROR);
}
PayOrderDTO payOrderDTO = jsonResult.getData();
//5.更新订单表与支付信息表
updateOrderPaymentInfo(payOrderDTO);
return payOrderDTO.clone(PrePayOrderDTO.class);
} finally {
//6.释放分布式锁
redisLock.unlock(key);
}
}
...
}
//预支付订单对象
@Data
public class PrePayOrderRequest extends AbstractObject implements Serializable {
private static final long serialVersionUID = -634137320435888212L;
private String userId;//用户ID
private String businessIdentifier;//业务方标识
private Integer payType;//支付类型
private String orderId;//订单ID
private Integer payAmount;//订单支付金额
private String callbackUrl;//支付成功后跳转地址
private String callbackFailUrl;//支付失败跳转地址
private String openid;//微信openid
private String subject;//订单摘要
private String itemInfo;//商品明细json
}二.预支付订单前的检查
@Service
public class OrderServiceImpl implements OrderService {
...
//预支付订单的前置检查
private void checkPrePayOrderInfo(String orderId, Integer payAmount) {
//查询订单信息
OrderInfoDO orderInfoDO = orderInfoDAO.getByOrderId(orderId);
OrderPaymentDetailDO orderPaymentDetailDO = orderPaymentDetailDAO.getPaymentDetailByOrderId(orderId);
if (orderInfoDO == null || orderPaymentDetailDO == null) {
throw new OrderBizException(OrderErrorCodeEnum.ORDER_INFO_IS_NULL);
}
//检查订单支付金额
if (!payAmount.equals(orderInfoDO.getPayAmount())) {
throw new OrderBizException(OrderErrorCodeEnum.ORDER_PAY_AMOUNT_ERROR);
}
//判断一下订单状态
if (!OrderStatusEnum.CREATED.getCode().equals(orderInfoDO.getOrderStatus())) {
throw new OrderBizException(OrderErrorCodeEnum.ORDER_STATUS_ERROR);
}
//判断一下支付状态
if (PayStatusEnum.PAID.getCode().equals(orderPaymentDetailDO.getPayStatus())) {
throw new OrderBizException(OrderErrorCodeEnum.ORDER_PAY_STATUS_IS_PAID);
}
//判断是否超过了支付超时时间
Date curDate = new Date();
if (curDate.after(orderInfoDO.getExpireTime())) {
throw new OrderBizException(OrderErrorCodeEnum.ORDER_PRE_PAY_EXPIRE_ERROR);
}
}
...
}三.调用支付系统进行预支付
@DubboService(version = "1.0.0", interfaceClass = PayApi.class, retries = 0)
public class PayApiImpl implements PayApi {
...
@Override
public JsonResult<PayOrderDTO> payOrder(PayOrderRequest payOrderRequest) {
String orderId = payOrderRequest.getOrderId();
Integer payAmount = payOrderRequest.getPayAmount();
String outTradeNo = RandomUtil.genRandomNumber(19);
//模拟调用了第三方支付平台的支付接口
//组装返回数据
PayOrderDTO payOrderDTO = new PayOrderDTO();
payOrderDTO.setOrderId(orderId);
payOrderDTO.setOutTradeNo(outTradeNo);
payOrderDTO.setPayType(PayTypeEnum.WECHAT_PAY.getCode());
Map<String, Object> payData = new HashMap<>();
payData.put("appid", "wx207d34495e688e0c");
payData.put("prepayId", RandomUtil.genRandomNumber(11));
payData.put("payAmount", payAmount);
payData.put("webUrl", "http://xxx/payurl");
payOrderDTO.setPayData(payData);
return JsonResult.buildSuccess(payOrderDTO);
}
...
}四.更新订单表与支付信息表
@Service
public class OrderServiceImpl implements OrderService {
...
//预支付更新订单支付信息
private void updateOrderPaymentInfo(PayOrderDTO payOrderDTO) {
String orderId = payOrderDTO.getOrderId();
Integer payType = payOrderDTO.getPayType();
String outTradeNo = payOrderDTO.getOutTradeNo();
Date payTime = new Date();
//订单表支付信息
OrderInfoDO orderInfoDO = orderInfoDAO.getByOrderId(orderId);
orderInfoDO.setPayType(payType);
orderInfoDO.setPayTime(payTime);
orderInfoDAO.updateById(orderInfoDO);
//支付明细信息
OrderPaymentDetailDO orderPaymentDetailDO = orderPaymentDetailDAO.getPaymentDetailByOrderId(orderId);
orderPaymentDetailDO.setPayTime(payTime);
orderPaymentDetailDO.setPayType(payType);
orderPaymentDetailDO.setOutTradeNo(outTradeNo);
orderPaymentDetailDAO.updateById(orderPaymentDetailDO);
//判断是否存在子订单
List<OrderInfoDO> subOrderInfoList = orderInfoDAO.listByParentOrderId(orderId);
if (subOrderInfoList != null && !subOrderInfoList.isEmpty()) {
for (OrderInfoDO subOrderInfoDO : subOrderInfoList) {
//更新子订单支付信息
subOrderInfoDO.setPayType(payType);
subOrderInfoDO.setPayTime(payTime);
orderInfoDAO.updateById(subOrderInfoDO);
//更新子订单支付明细信息
OrderPaymentDetailDO subOrderPaymentDetailDO = orderPaymentDetailDAO.getPaymentDetailByOrderId(subOrderInfoDO.getOrderId());
subOrderPaymentDetailDO.setPayTime(payTime);
subOrderPaymentDetailDO.setPayType(payType);
subOrderPaymentDetailDO.setOutTradeNo(outTradeNo);
orderPaymentDetailDAO.updateById(subOrderPaymentDetailDO);
}
}
}
...
}
2.订单系统的支付回调业务流程
(1)支付回调接口的使用场景
(2)支付回调接口的具体实现
(1)支付回调接口的使用场景
此接⼝是⽀付系统完成了⽤户⽀付操作之后⾃动来回调的,不是前端应⽤来发起调⽤的。
(2)支付回调接口的具体实现
一.支付回调的处理流程
二.更新订单为已支付
三.发送订单已支付消息
步骤一:回调⼊参检查
步骤二:添加分布式锁(与预⽀付接⼝使⽤同⼀把锁)
步骤三:订单状态与⽀付信息判断
步骤四:异常场景判断
正常情况下会修改订单状态与⽀付状态为已⽀付,并记录操作⽇志。异常情况下则需要分类进⾏判断,做各种处理。
注意:由于⽤户可能发起多次预⽀付调⽤,所以预⽀付与⽀付回调两个接⼝需要使⽤同⼀把分布式锁,以此来保证并发数据处理安全。
一.支付回调的处理流程
@Service
public class OrderServiceImpl implements OrderService {
...
//支付回调
@Transactional(rollbackFor = Exception.class)
@Override
public void payCallback(PayCallbackRequest payCallbackRequest) {
//1.入参检查
checkPayCallbackRequestParam(payCallbackRequest);
String orderId = payCallbackRequest.getOrderId();
Integer payAmount = payCallbackRequest.getPayAmount();
Integer payType = payCallbackRequest.getPayType();
//2.加支付分布式锁避免支付系统并发回调
String key = RedisLockKeyConstants.ORDER_PAY_KEY + orderId;
boolean lock = redisLock.lock(key);
if (!lock) {
throw new OrderBizException(OrderErrorCodeEnum.ORDER_PAY_CALLBACK_ERROR);
}
try {
//从数据库中查询出当前订单信息
OrderInfoDO orderInfoDO = orderInfoDAO.getByOrderId(orderId);
OrderPaymentDetailDO orderPaymentDetailDO = orderPaymentDetailDAO.getPaymentDetailByOrderId(orderId);
//3.校验参数
if (orderInfoDO == null || orderPaymentDetailDO == null) {
throw new OrderBizException(OrderErrorCodeEnum.ORDER_INFO_IS_NULL);
}
if (!payAmount.equals(orderInfoDO.getPayAmount())) {
throw new OrderBizException(OrderErrorCodeEnum.ORDER_CALLBACK_PAY_AMOUNT_ERROR);
}
//4.异常场景判断
Integer orderStatus = orderInfoDO.getOrderStatus();
if (OrderStatusEnum.CREATED.getCode().equals(orderStatus)) {
//如果订单状态是"已创建",直接更新订单状态为已支付
updateOrderStatusPaid(payCallbackRequest, orderInfoDO, orderPaymentDetailDO);
//发送"订单已完成支付"消息
sendPaidOrderSuccessMessage(orderInfoDO);
} else {
//如果订单状态不是"已创建"
if (OrderStatusEnum.CANCELED.getCode().equals(orderStatus)) {
//如果订单状态是取消状态
Integer payStatus = orderPaymentDetailDO.getPayStatus();
if (PayStatusEnum.UNPAID.getCode().equals(payStatus)) {
//调用退款
executeOrderRefund(orderInfoDO, orderPaymentDetailDO);
throw new OrderBizException(OrderErrorCodeEnum.ORDER_CANCEL_PAY_CALLBACK_ERROR);
} else if (PayStatusEnum.PAID.getCode().equals(payStatus)) {
if (payType.equals(orderPaymentDetailDO.getPayType())) {
throw new OrderBizException(OrderErrorCodeEnum.ORDER_CANCEL_PAY_CALLBACK_PAY_TYPE_SAME_ERROR);
} else {
throw new OrderBizException(OrderErrorCodeEnum.ORDER_CANCEL_PAY_CALLBACK_PAY_TYPE_NO_SAME_ERROR);
}
}
} else {
//如果订单状态不是取消状态
if (PayStatusEnum.PAID.getCode().equals(orderPaymentDetailDO.getPayStatus())) {
if (payType.equals(orderPaymentDetailDO.getPayType())) {
return;
}
//调用退款
executeOrderRefund(orderInfoDO, orderPaymentDetailDO);
throw new OrderBizException(OrderErrorCodeEnum.ORDER_CANCEL_PAY_CALLBACK_REPEAT_ERROR);
}
}
}
} finally {
//释放分布式锁
redisLock.unlock(key);
}
}
...
}
//支付系统回调请求对象
@Data
public class PayCallbackRequest extends AbstractObject implements Serializable {
private static final long serialVersionUID = 3685085492927992753L;
private String orderId;//订单ID
private String payAccount;//支付账户
private Integer payAmount;//支付金额
private String outTradeNo;//支付系统交易单号
private Integer payType;//支付方式
private String merchantId;//商户号
private String payChannel;//支付渠道
private String appid;//微信平台 appid
}二.更新订单为已支付
@Service
public class OrderServiceImpl implements OrderService {
...
//更新订单状态为 已支付
private void updateOrderStatusPaid(PayCallbackRequest payCallbackRequest, OrderInfoDO orderInfoDO, OrderPaymentDetailDO orderPaymentDetailDO) {
//主单信息
String orderId = payCallbackRequest.getOrderId();
Integer preOrderStatus = orderInfoDO.getOrderStatus();
orderInfoDO.setOrderStatus(OrderStatusEnum.PAID.getCode());
orderInfoDAO.updateById(orderInfoDO);
//主单支付信息
orderPaymentDetailDO.setPayStatus(PayStatusEnum.PAID.getCode());
orderPaymentDetailDAO.updateById(orderPaymentDetailDO);
//新增订单状态变更日志
OrderOperateLogDO orderOperateLogDO = new OrderOperateLogDO();
orderOperateLogDO.setOrderId(orderId);
orderOperateLogDO.setOperateType(OrderOperateTypeEnum.PAID_ORDER.getCode());
orderOperateLogDO.setPreStatus(preOrderStatus);
orderOperateLogDO.setCurrentStatus(orderInfoDO.getOrderStatus());
orderOperateLogDO.setRemark("订单支付回调操作" + orderOperateLogDO.getPreStatus() + "-" + orderOperateLogDO.getCurrentStatus());
orderOperateLogDAO.save(orderOperateLogDO);
//判断是否存在子订单
List<OrderInfoDO> subOrderInfoDOList = orderInfoDAO.listByParentOrderId(orderId);
if (subOrderInfoDOList != null && !subOrderInfoDOList.isEmpty()) {
//先将主订单状态设置为无效订单
Integer newPreOrderStatus = orderInfoDO.getOrderStatus();
orderInfoDO.setOrderStatus(OrderStatusEnum.INVALID.getCode());
orderInfoDAO.updateById(orderInfoDO);
//新增订单状态变更日志
OrderOperateLogDO newOrderOperateLogDO = new OrderOperateLogDO();
newOrderOperateLogDO.setOrderId(orderId);
newOrderOperateLogDO.setOperateType(OrderOperateTypeEnum.PAID_ORDER.getCode());
newOrderOperateLogDO.setPreStatus(newPreOrderStatus);
newOrderOperateLogDO.setCurrentStatus(OrderStatusEnum.INVALID.getCode());
orderOperateLogDO.setRemark("订单支付回调操作,主订单状态变更" + newOrderOperateLogDO.getPreStatus() + "-" + newOrderOperateLogDO.getCurrentStatus());
orderOperateLogDAO.save(newOrderOperateLogDO);
//再更新子订单的状态
for (OrderInfoDO subOrderInfo : subOrderInfoDOList) {
Integer subPreOrderStatus = subOrderInfo.getOrderStatus();
subOrderInfo.setOrderStatus(OrderStatusEnum.PAID.getCode());
orderInfoDAO.updateById(subOrderInfo);
//更新子订单的支付明细状态
String subOrderId = subOrderInfo.getOrderId();
OrderPaymentDetailDO subOrderPaymentDetailDO = orderPaymentDetailDAO.getPaymentDetailByOrderId(subOrderId);
if (subOrderPaymentDetailDO != null) {
subOrderPaymentDetailDO.setPayStatus(PayStatusEnum.PAID.getCode());
orderPaymentDetailDAO.updateById(subOrderPaymentDetailDO);
}
//新增订单状态变更日志
OrderOperateLogDO subOrderOperateLogDO = new OrderOperateLogDO();
subOrderOperateLogDO.setOrderId(subOrderId);
subOrderOperateLogDO.setOperateType(OrderOperateTypeEnum.PAID_ORDER.getCode());
subOrderOperateLogDO.setPreStatus(subPreOrderStatus);
subOrderOperateLogDO.setCurrentStatus(OrderStatusEnum.PAID.getCode());
orderOperateLogDO.setRemark("订单支付回调操作,子订单状态变更" + subOrderOperateLogDO.getPreStatus() + "-" + subOrderOperateLogDO.getCurrentStatus());
orderOperateLogDAO.save(subOrderOperateLogDO);
}
}
}
...
}三.发送订单已支付消息
@Service
public class OrderServiceImpl implements OrderService {
...
//发送订单已完成支付消息,触发订单进行履约
private void sendPaidOrderSuccessMessage(OrderInfoDO orderInfoDO) {
String orderId = orderInfoDO.getOrderId();
PaidOrderSuccessMessage message = new PaidOrderSuccessMessage();
message.setOrderId(orderId);
String msgJson = JSON.toJSONString(message);
defaultProducer.sendMessage(RocketMqConstant.PAID_ORDER_SUCCESS_TOPIC, msgJson, "订单已完成支付");
}
...
}
3.预支付和支付回调的并发问题
⽤户可能触发多次订单⽀付,也就是可能会多次调⽤预⽀付接⼝。预⽀付订单接⼝会更改订单与⽀付记录,⽀付回调也会判断订单⽀付状态。如下图示:
所以预⽀付与⽀付回调两个接⼝需要基于分布式锁来保证数据并发安全,采⽤Redisson框架来实现分布式锁。
4.Redisson分布式锁解决预支付和支付并发问题
//Redis分布式锁
public class RedisLock {
RedissonClient redissonClient;
public RedisLock(RedissonClient redissonClient) {
this.redissonClient = redissonClient;
}
//互斥锁,seconds秒后自动失效
public boolean lock(String key, int seconds) {
RLock rLock = redissonClient.getLock(key);
if (rLock.isLocked()) {
return false;
}
rLock.lock(seconds, TimeUnit.SECONDS);
return true;
}
//互斥锁,自动续期
public boolean lock(String key) {
RLock rLock = redissonClient.getLock(key);
if (rLock.isLocked()) {
return false;
}
rLock.lock();
return true;
}
//手动释放锁
public void unlock(String key) {
RLock rLock = redissonClient.getLock(key);
if (rLock.isLocked()) {
rLock.unlock();
}
}
}
5.支付成功后推送订单到履约系统的实现
(1)推送订单到履约系统的使用场景
(2)推送订单到履约系统的业务流程
(1)推送订单到履约系统的使用场景
订单系统监听到"订单已⽀付"的消息时触发。
(2)推送订单到履约系统的业务流程
步骤一:订单系统监听RocketMQ的订单已⽀付消息
步骤二:订单系统收到订单已⽀付消息,根据消息中的orderId查询出订单
步骤三:校验订单是否已⽀付,如果不是已支付,那么就直接终⽌流程
步骤四:构造履约请求,调⽤履约系统的履约接⼝推送订单到履约系统
步骤五:如果调⽤失败,那么就抛出异常,触发RocketMQ的消息重复消费功能
步骤六:如果调⽤成功,那么就更新订单状态为已履约 + 插⼊⼀条订单变更记录
步骤七:⼿动提交订单已⽀付消息的ack
@Service
public class OrderServiceImpl implements OrderService {
...
//发送订单已完成支付消息,触发订单进行履约
private void sendPaidOrderSuccessMessage(OrderInfoDO orderInfoDO) {
String orderId = orderInfoDO.getOrderId();
PaidOrderSuccessMessage message = new PaidOrderSuccessMessage();
message.setOrderId(orderId);
String msgJson = JSON.toJSONString(message);
defaultProducer.sendMessage(RocketMqConstant.PAID_ORDER_SUCCESS_TOPIC, msgJson, "订单已完成支付");
}
...
}@Configurationpublic class ConsumerConfig { @Autowired private RocketMQProperties rocketMQProperties; //订单完成支付消息消费者 @Bean("paidOrderSuccessConsumer") public DefaultMQPushConsumer paidOrderSuccessConsumer(PaidOrderSuccessListener paidOrderSuccessListener) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(PAID_ORDER_SUCCESS_CONSUMER_GROUP); consumer.setNamesrvAddr(rocketMQProperties.getNameServer()); consumer.subscribe(PAID_ORDER_SUCCESS_TOPIC, "*"); consumer.registerMessageListener(paidOrderSuccessListener); consumer.start(); return consumer; } ...}//监听订单支付成功的消息@Componentpublic class PaidOrderSuccessListener implements MessageListenerConcurrently { @Autowired RedisLock redisLock; @Autowired private OrderFulFillService orderFulFillService; @Override public ConsumeConcurrentlyStatus consumeMessage(List list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { try { for (MessageExt messageExt : list) { String message = new String(messageExt.getBody()); PaidOrderSuccessMessage paidOrderSuccessMessage = JSON.parseObject(message, PaidOrderSuccessMessage.class); String orderId = paidOrderSuccessMessage.getOrderId(); log.info("触发订单履约,orderId:{}", orderId); //1.加分布式锁 + 里面的履约前置状态校验防止消息重复消费 String key = RedisLockKeyConstants.ORDER_FULFILL_KEY + orderId; boolean lock = redisLock.lock(key); if (!lock) { log.error("order has not acquired lock,cannot fulfill, orderId={}", orderId); throw new BaseBizException(OrderErrorCodeEnum.ORDER_FULFILL_ERROR); } try { //2.触发订单履约逻辑 //注意这里分布式锁加锁放在了本地事务外面 orderFulFillService.triggerOrderFulFill(orderId); } finally { if (lock) { redisLock.unlock(key); } } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } catch (Exception e) { log.error("consumer error", e); //本地业务逻辑执行失败,触发消息重新消费 return ConsumeConcurrentlyStatus.RECONSUME_LATER; } }}@Servicepublic class OrderFulFillServiceImpl implements OrderFulFillService { ... @Transactional(rollbackFor = Exception.class) @Override public void triggerOrderFulFill(String orderId) throws OrderBizException { //1.获取订单数据 OrderInfoDO order = orderInfoDAO.getByOrderId(orderId); if (Objects.isNull(order)) { return; } //2.校验订单是否已支付 OrderStatusEnum orderStatus = OrderStatusEnum.getByCode(order.getOrderStatus()); if (!OrderStatusEnum.PAID.equals(orderStatus)) { log.info("order has not been paid,cannot fulfill, orderId={}", order.getOrderId()); return; } //3.推送订单至履约系统 JsonResult jsonResult = fulfillApi.receiveOrderFulFill(buildReceiveFulFillRequest(order)); if (!jsonResult.getSuccess()) { log.error("push order to fulfill-system error,orderId={}", order.getOrderId()); throw new OrderBizException(OrderErrorCodeEnum.ORDER_FULFILL_ERROR); } //4.更新订单状态为:已履约 orderInfoDAO.updateOrderStatus(orderId, OrderStatusEnum.PAID.getCode(), OrderStatusEnum.FULFILL.getCode()); //5.并插入一条订单变更记录 orderOperateLogDAO.save(orderOperateLogFactory.get(order, OrderStatusChangeEnum.ORDER_FULFILLED)); // TODO 使用事务消息,解决:推送履约系统成功,但是执行本地事物失败的场景 } ...}
6.推送订单到履约系统时的消息丢失和消费重复问题
订单系统中可能存在消息丢失的三个业务环节,如下虚线所示:
解决方法如下:
环节一:支付回调后更新订单状态成功,但是发送订单支付成功的消息到MQ失败。此时可以通过使用RocketMQ的事务机制来解决。
环节二:消息消费时为了保证幂等,可以通过分布式锁 + 状态前置校验来实现。加分布式锁是为了防止并非请求进来后可能会避开状态的前置校验。加分布式锁应放在本地事务外,⽽不能将加锁包裹在本地事务⾥。
环节三:订单系统消费订单支付成功消息时,调用履约系统的接⼝失败了。此时需要抛出异常,并触发MQ的消息重复消费功能,重新执⾏订单履约流程。也就是利用MQ的重试队列 + 死信队列,确保消息能成功消费。
环节四:履约系统的接⼝调用成功了,但是更新订单状态为已履约的本地事务的执行失败了。此时需要保证调⽤履约系统接⼝和本地事务执⾏的最终⼀致性,这可以通过使⽤RocketMQ的事务机制实现。
7.推送订单到履约系统时面临的问题
问题一:(步骤2成功 + 步骤3失败)
支付回调后更新订单状态成功,但发送订单支付成功的消息到MQ失败。此时,可以使用RocketMQ的事务机制来解决。
问题二:(步骤4成功 + 步骤5失败)
订单系统消费订单支付成功的消息成功,但调⽤履约系统接⼝失败。此时,可以利用MQ的重试队列 + 死信队列,确保消息能成功消费。
问题三:(步骤5成功 + 步骤6失败)
订单系统调用履约系统的接口成功,但执行本地事务更新订单状态失败。此时,可以使用RocketMQ的事务机制来解决。
8.推送订单到履约系统失败后的MQ重试与死信机制
消费MQ的订单支付成功消息时,会try catch推送订单到履约系统的异常。如果发现抛出异常,就会返回MQ一个RECONSUME_LATER重新消费。
//监听订单支付成功的消息
@Component
public class PaidOrderSuccessListener implements MessageListenerConcurrently {
@Autowired
RedisLock redisLock;
@Autowired
private OrderFulFillService orderFulFillService;
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
try {
for (MessageExt messageExt : list) {
String message = new String(messageExt.getBody());
PaidOrderSuccessMessage paidOrderSuccessMessage = JSON.parseObject(message, PaidOrderSuccessMessage.class);
String orderId = paidOrderSuccessMessage.getOrderId();
log.info("触发订单履约,orderId:{}", orderId);
//1.加分布式锁 + 里面的履约前置状态校验防止消息重复消费
String key = RedisLockKeyConstants.ORDER_FULFILL_KEY + orderId;
boolean lock = redisLock.lock(key);
if (!lock) {
log.error("order has not acquired lock,cannot fulfill, orderId={}", orderId);
throw new BaseBizException(OrderErrorCodeEnum.ORDER_FULFILL_ERROR);
}
try {
//2.触发订单履约逻辑
//注意这里分布式锁加锁放在了本地事务外面
orderFulFillService.triggerOrderFulFill(orderId);
} finally {
if (lock) {
redisLock.unlock(key);
}
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
log.error("consumer error", e);
//本地业务逻辑执行失败,触发消息重新消费
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
}
@Service
public class OrderFulFillServiceImpl implements OrderFulFillService {
...
@Transactional(rollbackFor = Exception.class)
@Override
public void triggerOrderFulFill(String orderId) throws OrderBizException {
//1.获取订单数据
OrderInfoDO order = orderInfoDAO.getByOrderId(orderId);
if (Objects.isNull(order)) {
return;
}
//2.校验订单是否已支付
OrderStatusEnum orderStatus = OrderStatusEnum.getByCode(order.getOrderStatus());
if (!OrderStatusEnum.PAID.equals(orderStatus)) {
log.info("order has not been paid,cannot fulfill, orderId={}", order.getOrderId());
return;
}
//3.推送订单至履约系统
JsonResult<Boolean> jsonResult = fulfillApi.receiveOrderFulFill(buildReceiveFulFillRequest(order));
if (!jsonResult.getSuccess()) {
log.error("push order to fulfill-system error,orderId={}", order.getOrderId());
throw new OrderBizException(OrderErrorCodeEnum.ORDER_FULFILL_ERROR);
}
//4.更新订单状态为:已履约
orderInfoDAO.updateOrderStatus(orderId, OrderStatusEnum.PAID.getCode(), OrderStatusEnum.FULFILL.getCode());
//5.并插入一条订单变更记录
orderOperateLogDAO.save(orderOperateLogFactory.get(order, OrderStatusChangeEnum.ORDER_FULFILLED));
//TODO 使用事务消息解决:推送履约系统成功,但是执行本地事物失败的场景
}
...
}注意:MQ的一个Topic会分为3种队列:业务队列、重试队列、死信队列。发送到MQ的消息会放入业务队列中。消费MQ的消息失败时,响应MQ需要进行重试,消息则放入重试队列。在重试队列中重复消费16次(约2小时)都没能成功,消息则放入死信队列。死信队列的消息需要重新监听进行消费才能处理。
9.订单支付成功后的履约流程
(1)订单履约接口的使用场景
(2)订单履约接口的具体实现
(3)订单履约的全流程图
(1)订单履约接口的使用场景
订单系统触发履约流程时会调⽤该接⼝,将订单履约信息推送给履约系统。
(2)订单履约接口的具体实现
履约服务需要提供该接⼝进⾏后续的订单履约操作。在这⾥,履约系统将扮演"仓储+物流"的⻆⾊,会进⾏后续的商品捡货出库、商品打包邮寄配送、以及包裹签收的流程。具体如下所示:
@DubboService(version = "1.0.0", interfaceClass = FulfillApi.class, retries = 0)
public class FulfillApiImpl implements FulfillApi {
@Override
public JsonResult<Boolean> receiveOrderFulFill(ReceiveFulFillRequest request) {
...
log.info("接受订单履约成功,request={}", JSONObject.toJSONString(request));
return JsonResult.buildSuccess(true);
}
...
}
//接受订单履约请求
@Data
@Builder
public class ReceiveFulFillRequest implements Serializable {
private String orderId;//订单号
private String sellerId;//商家id
private String userId;//用户id
private Integer deliveryType;//配送类型,默认是自配送
private String receiverName;//收货人姓名
private String receiverPhone;//收货人电话
private String receiverProvince;//省
private String receiverCity;//市
private String receiverArea;//区
private String receiverStreetAddress;//街道地址
private String receiverDetailAddress;//详细地址
private BigDecimal receiverLon;//经度 六位小数点
private BigDecimal receiverLat;//纬度 六位小数点
private String shopRemark;//商家备注
private String userRemark;//用户备注
private Integer payType;//支付方式
private Integer payAmount;//付款总金额
private Integer totalAmount;//交易总金额
private Integer deliveryAmount;//运费
private List<ReceiveOrderItemRequest> receiveOrderItems;//订单商品明细
@Tolerate
public ReceiveFulFillRequest() {
}
}
//履约订单商品明细请求
@Data
@Builder
public class ReceiveOrderItemRequest implements Serializable {
private String skuCode;//商品id
private String productName;//商品名称
private Integer salePrice;//销售单价
private Integer saleQuantity;//销售数量
private String productUnit;//商品单位
private Integer payAmount;//付款金额
private Integer originAmount;//当前商品支付原总价
@Tolerate
public ReceiveOrderItemRequest() {
}
}(3)订单履约的全流程图
10.订单履约状态消息的乱序问题和消息强顺序方案
(1)RocketMQ的消息乱序问题
(2)RocketMQ的消息强顺序方案
(1)RocketMQ的消息乱序问题
同一个订单的各个履约状态消息写到MQ之后,会先被写入到CommitLog中,然后再被分散到不同的ConsumerQueue里。ConsumerQueue会记录消息在CommitLog中的offset。不同的ConsumerQueue又会被不同的线程或不同服务的进程来消费处理,从而导致同一个订单的多个履约状态消息可能出现乱序消费的问题。比如可能先消费了某个订单的物流配送消息,再消费该订单的出库消息。
(2)RocketMQ的消息强顺序方案
同一个订单ID的各个履约状态需要写入同一个ConsumerQueue,因为ConsumerQueue里的消息是会按顺序消费的,而这可以通过重写MQ的MessageQueueSelector的select()方法来实现。
@DubboService(version = "1.0.0", interfaceClass = FulfillApi.class, retries = 0)
public class FulfillApiImpl implements FulfillApi {
@Autowired
private DefaultProducer defaultProducer;
...
@Override
public JsonResult<Boolean> triggerOrderWmsShipEvent(String orderId, OrderStatusChangeEnum orderStatusChange, BaseWmsShipEvent wmsEvent) {
log.info("触发订单物流配送结果事件,orderId={}, orderStatusChange={}, wmsEvent={}", orderId, orderStatusChange, JSONObject.toJSONString(wmsEvent));
Message message = null;
String body = null;
if (OrderStatusChangeEnum.ORDER_OUT_STOCKED.equals(orderStatusChange)) {
message = new Message();
//订单已出库事件
OrderOutStockWmsEvent outStockEvent = (OrderOutStockWmsEvent) wmsEvent;
outStockEvent.setOrderId(orderId);
//构建订单已出库消息体
OrderEvent<OrderOutStockWmsEvent> orderEvent = buildOrderEvent(orderId, OrderStatusChangeEnum.ORDER_OUT_STOCKED, outStockEvent, OrderOutStockWmsEvent.class);
body = JSONObject.toJSONString(orderEvent);
} else if (OrderStatusChangeEnum.ORDER_DELIVERED.equals(orderStatusChange)) {
message = new Message();
//订单已配送事件
OrderDeliveredWmsEvent deliveredWmsEvent = (OrderDeliveredWmsEvent) wmsEvent;
deliveredWmsEvent.setOrderId(orderId);
//构建订单已配送消息体
OrderEvent<OrderDeliveredWmsEvent> orderEvent = buildOrderEvent(orderId, OrderStatusChangeEnum.ORDER_DELIVERED, deliveredWmsEvent, OrderDeliveredWmsEvent.class);
body = JSONObject.toJSONString(orderEvent);
} else if (OrderStatusChangeEnum.ORDER_SIGNED.equals(orderStatusChange)) {
message = new Message();
//订单已签收事件
OrderSignedWmsEvent signedWmsEvent = (OrderSignedWmsEvent) wmsEvent;
signedWmsEvent.setOrderId(orderId);
//构建订单已签收消息体
OrderEvent<OrderSignedWmsEvent> orderEvent = buildOrderEvent(orderId, OrderStatusChangeEnum.ORDER_SIGNED, signedWmsEvent, OrderSignedWmsEvent.class);
body = JSONObject.toJSONString(orderEvent);
}
if (null != message) {
message.setTopic(RocketMqConstant.ORDER_WMS_SHIP_RESULT_TOPIC);
message.setBody(body.getBytes(StandardCharsets.UTF_8));
try {
DefaultMQProducer defaultMQProducer = defaultProducer.getProducer();
SendResult sendResult = defaultMQProducer.send(message, new MessageQueueSelector() {
//消息强顺序方案
@Override
public MessageQueue select(List<MessageQueue> mqs, Message message, Object arg) {
//根据订单id选择发送queue
String orderId = (String) arg;
long index = hash(orderId) % mqs.size();
return mqs.get((int) index);
}
}, orderId);
log.info("send order wms ship result message finished,SendResult status:%s, queueId:%d, body:%s", sendResult.getSendStatus(), sendResult.getMessageQueue().getQueueId(), body);
} catch (Exception e) {
log.error("send order wms ship result message error, orderId={}, err={}", orderId, e.getMessage(), e);
}
}
return JsonResult.buildSuccess(true);
}
...
}
11.订单履约状态消息的挂起重试和幂等处理
(1)订单系统对履约状态消息的业务处理
(2)订单系统消费履约状态消息时的关键处理
(1)订单系统对履约状态消息的业务处理
情况一:订单系统监听到"订单已出库消息"
首先,校验订单状态,如果订单状态不是已履约,流程直接结束。然后,订单配送表增加出库时间。接着,更新订单状态为已出库,并增加⼀条操作⽇志。
情况二:订单系统监听到"订单已配送消息"
首先,校验订单状态,如果订单状态不是已出库,流程直接结束。然后,订单配送表增加配送员信息(编号、⼿机、姓名)。接着,更新订单状态为已配送,并增加⼀条操作⽇志。
情况三:订单系统监听到"订单已签收消息"
首先,校验订单状态,如果订单状态不是已签收,流程直接结束。然后,订单配送表增加签收时间。接着,更新订单状态为签收,并增加⼀条操作⽇志。
(2)订单系统消费履约状态消息时的关键处理
一.消费失败时进行挂起重试处理
二.消息消费的幂等处理
三.模版方法模式的应用
一.消费失败时进行挂起重试处理
本地业务逻辑需要加数据库事务。当本地事务执⾏失败时需要抛异常,触发MQ的消息重复消费功能。此时返回如下挂起重试响应给MQ,先挂起该消费线程,然后再重新消费消息执⾏订单履约流程。
挂起重试响应:SUSPEND_CURRENT_QUEUE_A_MOMENT
重复消费响应:RECONSUME_LATER由于同一个订单的已出库、已配送、已签收消息需要顺序消费,所以如果同一个订单的已出库消息消费异常,就不能继续往下消费,也不能直接返回重复消费响应给MQ。而是需要返回上述挂起重试响应给MQ,即挂起当前消费线程,然后再重复消费该消息,直到成功才继续消费。
二.消息消费的幂等处理
通过分布式锁 + 状态前置校验实现。加分布式锁是为了防止并非请求进来后可能会避开状态的前置校验。加分布式锁应该放在本地事务外,不能将加锁包裹在本地事务⾥。
三.模版方法模式的应用
通过模板⽅法模式规范对已出库、已配送、已签收的处理流程。
//消费订单物流配送结果消息@Componentpublic class OrderWmsShipResultListener implements MessageListenerOrderly { @Autowired private RedisLock redisLock; @Autowired private OrderFulFillService orderFulFillService; @Override public ConsumeOrderlyStatus consumeMessage(List list, ConsumeOrderlyContext consumeOrderlyContext) { OrderEvent orderEvent; try { for (MessageExt messageExt : list) { String message = new String(messageExt.getBody()); log.info("received orderWmsShopResultmessage:{}", message); orderEvent = JSONObject.parseObject(message, OrderEvent.class); //1.解析消息 WmsShipDTO wmsShipDTO = buildWmsShip(orderEvent); //2.加分布式锁 + 里面的前置状态校验防止消息重复消费 String key = RedisLockKeyConstants.ORDER_WMS_RESULT_KEY + wmsShipDTO.getOrderId(); boolean lock = redisLock.lock(key); if (!lock) { log.error("order has not acquired lock,cannot inform order wms result, orderId={}", wmsShipDTO.getOrderId()); throw new BaseBizException(OrderErrorCodeEnum.ORDER_NOT_ALLOW_INFORM_WMS_RESULT); } //3.通知订单物流结果 //注意这里分布式锁加锁放在了本地事务外面 try { orderFulFillService.informOrderWmsShipResult(wmsShipDTO); } finally { if (lock) { redisLock.unlock(key); } } } return ConsumeOrderlyStatus.SUCCESS; } catch (Exception e) { //处理业务逻辑失败! Suspend current queue a moment return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; } } private WmsShipDTO buildWmsShip(OrderEvent orderEvent) { String messageContent = JSONObject.toJSONString(orderEvent.getMessageContent()); WmsShipDTO wmsShipDTO = new WmsShipDTO(); wmsShipDTO.setStatusChange(orderEvent.getOrderStatusChange()); if (OrderStatusChangeEnum.ORDER_OUT_STOCKED.equals(orderEvent.getOrderStatusChange())) { //订单已出库消息 OrderOutStockWmsEvent outStockWmsEvent = JSONObject.parseObject(messageContent, OrderOutStockWmsEvent.class); BeanCopierUtil.copyProperties(outStockWmsEvent, wmsShipDTO); } else if (OrderStatusChangeEnum.ORDER_DELIVERED.equals(orderEvent.getOrderStatusChange())) { //订单已配送消息 OrderDeliveredWmsEvent deliveredWmsEvent = JSONObject.parseObject(messageContent, OrderDeliveredWmsEvent.class); BeanCopierUtil.copyProperties(deliveredWmsEvent, wmsShipDTO); } else if (OrderStatusChangeEnum.ORDER_SIGNED.equals(orderEvent.getOrderStatusChange())) { //订单已签收消息 OrderSignedWmsEvent signedWmsEvent = JSONObject.parseObject(messageContent, OrderSignedWmsEvent.class); BeanCopierUtil.copyProperties(signedWmsEvent, wmsShipDTO); } return wmsShipDTO; }}@Servicepublic class OrderFulFillServiceImpl implements OrderFulFillService { @Autowired private SpringApplicationContext springApplicationContext; ... @Override public void informOrderWmsShipResult(WmsShipDTO wmsShipDTO) throws OrderBizException { //1.获取对应的订单物流结果处理器 OrderWmsShipResultProcessor processor = getProcessor(wmsShipDTO.getStatusChange()); //2.执行 if (null != processor) { processor.execute(wmsShipDTO); } } //获取对应的订单物流结果处理器 private OrderWmsShipResultProcessor getProcessor(OrderStatusChangeEnum orderStatusChange) { if (OrderStatusChangeEnum.ORDER_OUT_STOCKED.equals(orderStatusChange)) { return springApplicationContext.getBean(OrderOutStockedProcessor.class); } else if (OrderStatusChangeEnum.ORDER_DELIVERED.equals(orderStatusChange)) { return springApplicationContext.getBean(OrderDeliveredProcessor.class); } else if (OrderStatusChangeEnum.ORDER_SIGNED.equals(orderStatusChange)) { return springApplicationContext.getBean(OrderSignedProcessor.class); } return null; } ...}//Spring IOC容器组件public class SpringApplicationContext { //Spring容器 private ApplicationContext context; //构造函数 public SpringApplicationContext(ApplicationContext context) { this.context = context; } //获取bean publicT getBean(Class
页:
[1]