找回密码
 立即注册
首页 业界区 安全 消息队列上篇--基础

消息队列上篇--基础

扎先 2025-7-2 18:01:36
消息队列--上篇--入门

本文示例代码见GITEE
地址:https://gitee.com/quercus-sp204/qmm-study/tree/master/reslove-mq 【reslove-mq模块部分】
0.消息队列

消息队列(Message Queue)是一种中间件组件,用于在不同应用或服务之间以异步方式传递数据包或“消息”,并在接收方准备好之前将其暂存于队列中,保证数据不丢失并按顺序处理,常常在分布式系统或微服务架构中,实现应用组件之间松耦合且可靠的数据传递。其核心价值在于解耦生产者与消费者、提高系统的可靠性、可伸缩性和弹性,并在高并发或网络不稳定情况下保证消息不丢失。常见中间件包括 RabbitMQ、Apache Kafka、RocketMQ,本文就以RocketMQ5.3.2为主体,来探索一下消息队列。
微服务架构中:各微服务通过消息队列解耦,实现异步任务、事件驱动(Event-Driven)和领域事件通知;
电商系统中:用户下单后需要发送短信通知、更新积分、记录日志等非核心操作。将这些操作放入消息队列,主线程无需等待子任务完成即可立即响应,显著提升系统处理速度。还比如,电商下单流程中,订单创建成功后通过消息队列异步处理库存更新、支付通知等,缩短用户等待时间。还比如,当秒杀活动、促销期间短时间内大量请求涌入时,消息队列作为 “缓冲池”,将突发流量存入队列,接收方按自身处理能力消费消息,避免系统因瞬间压力过大而崩溃。
在日志收集与分析系统中:各服务日志写入队列,由专门的日志处理组件异步消费并持久化到 ELK 等系统中;
在物联网(IoT)设备通信中:大量终端设备产生数据,通过消息队列汇聚后再统一处理和存储,有效缓解突发流量;
首先,假设我们不知道有上述的mq产品,如果说我们想要自己实现一个消息中间件,我们需要考虑这个消息中间件模型呢?
1.png

市面上主要的消息中间件,大致模型都是如上图所示:

  • 消息的产生【生产者】:他们负责产生消息,给我们设计的消息中间件;
  • MQ层:它可以接口生产者发送过来的消息,存到“内存”/“磁盘”中,(这里为什么要存到磁盘中?因为如果重启的话,内存数据会丢失,以此来达到持久化的目的),然后根据某种关系可以把消息主动推送给想要这个消息的对象;
  • 消费者层:它与我们设计的消息中间件建立某种意义上的联系,可以主动拉取/被动推送 自己感兴趣的消息。
一个良好的消息队列需要具备哪些好的设计呢?
1.消息队列中最重要的肯定就是“消息”了,可靠消息传递,保证数据不丢失这个是至关重要的,消息队列中间件支持持久化存储、重试机制、确认机制(ACK)等,确保消息被可靠处理; 2.能够支持集群,保证高可用;
那么,马上就来到了喜闻乐见的比较环节:【笔者只用过第一二两款消息中间件】
中间件协议支持吞吐能力持久化方式消息排序可伸缩性典型应用场景RocketMQ自定义协议,支持 JMS、MQTT、HTTP REST高(优化后数十万–百万级 TPS)高性能 Journal(LevelDB、KahaDB)、JDBC支持独占队列/顺序消费分布式高可用集群,多主题分区大规模电商事件、事务消息、金融级消息中间件RabbitMQAMQP、MQTT、STOMP、HTTP,Streams 插件中等(数万–数十万 msg/s)可选磁盘持久化(持久队列)队列级顺序保障镜像队列集群,水平扩展较有限企业级消息、分布式任务、IoT 消息传递KafkaKafka 原生二进制协议高(可达百万级 msg/s)持久化日志(段文件、可配置保留策略)分区内顺序保障基于分区的水平扩展,可横向无限扩容流处理、日志聚合、实时分析ActiveMQOpenWire、AMQP、MQTT、STOMP、JMS中等(数十万 msg/s)KahaDB、LevelDB、Artemis 日志、JDBC队列级顺序 + JMS 优先级/独占消费Broker 网络集群,扩展性次于 KafkaJava EE 集成、企业应用消息中间件RocketMQ的搭建与部署本文就不说明了,在官网可以看到详细步骤:https://rocketmq.apache.org/zh/docs/quickStart/01quickstart
部署看这个:https://rocketmq.apache.org/zh/docs/deploymentOperations/01deploy
但是由于照着官网来的教程都有问题:笔者在此写一些重要的部署内容“,【本文是以虚拟机Local本地部署的,单组节点单副本模式】
第一,下载rocketmq然后上传到服务器我这里就不严实了。给出主要启动命令【本文以版本5.3.2为例子】
  1. ##=========================mq bin/
  2. 1.启动NameServer:
  3. # 后台启动----------------------------------
  4. nohup sh mqnamesrv > myNamesrv.out 2>&1 &
  5. 2.启动Broker+proxy:
  6. # 后台启动---------------------------------
  7. nohup sh mqbroker -n localhost:9876 --enable-proxy > mybroker.out 2>&1 &
  8. ##=========================console--这个是控制台项目打包的jar
  9. # idea打成jar包之后,上传到机器上面即可,然后在该jar包执行命令运行jar包
  10. 3.启动console-dashboard
  11. nohup java -jar dashboard-rocketmq-2.0.0.jar > log.out 2>&1 &
  12. sh mqshutdown namesrv //关闭NameServer命令
  13. sh mqshutdown broker //关闭Broker命令
  14. firewall-cmd --zone=public --add-port=8081/tcp --permanent #记得开放8081端口
  15. #记得开放8090端口,笔者的控制台jar包时8090端口,所以开放8090,方便查看控制台
  16. firewall-cmd --zone=public --add-port=8090/tcp --permanent
  17. firewall-cmd --zone=public --add-port=9876/tcp --permanent
  18. firewall-cmd --reload
  19. # 下面是多余的,不用看了
  20. firewall-cmd --zone=public --add-port=8080/tcp --permanent
  21. firewall-cmd --zone=public --add-port=9876/tcp --permanent
  22. firewall-cmd --zone=public --add-port=10911/tcp --permanent
  23. firewall-cmd --zone=public --add-port=11011/tcp --permanent
  24. firewall-cmd --zone=public --add-port=10909/tcp --permanent
复制代码
注意点: dashboard控制台项目记得把server.port换成8090,其实只要是不与后面俩端口冲突就行了,不要8080/8081, 8080/8081留给proxy这个进程如下图:
2.png

第二,注意官网这一句话,Client连接的时候,是Proxy的,笔者经过第一步里面的lsof命令,找到笔者机器的proxy是8081端口。
3.png

第三,控制台界面要切换到5.x版本
4.png

1.RocketMQ相关介绍

大部分摘选自官网
RocketMQ的领域模型:(来自官网)
5.png

可以看到RocketMQ的模型和生产者-队列-消费者模型类似。只不过RocketMQ中间那一层是叫做Topic的。生产者这一端没有什么好说的,主要来看一下中间消息存储端和消费者端。

  • 消息存储:[ MQ里面当然是允许多个Topic存在的 ]
主题(Topic):
Apache RocketMQ 消息传输和存储的分组容器,主题内部由多个队列组成,消息的存储和水平扩展实际是通过主题内的队列实现的。
队列(MessageQueue):
Apache RocketMQ 消息传输和存储的实际单元容器,类比于其他消息队列中的分区。 Apache RocketMQ 通过流式特性的无限队列结构来存储消息,消息在队列内具备顺序性存储特征。
消息(Message):
Apache RocketMQ 的最小传输单元。消息具备不可变性,在初始化发送和完成存储后即不可变。


  • 消费者
消费者分组(ConsumerGroup):
Apache RocketMQ 发布订阅模型中定义的独立的消费身份分组,用于统一管理底层运行的多个消费者(Consumer)。同一个消费组的多个消费者必须保持消费逻辑和配置一致,共同分担该消费组订阅的消息,实现消费能力的水平扩展。
消费者(Consumer):
Apache RocketMQ 消费消息的运行实体,一般集成在业务调用链路的下游。消费者必须被指定到某一个消费组中。
订阅关系(Subscription):
Apache RocketMQ 发布订阅模型中消息过滤、重试、消费进度的规则配置。订阅关系以消费组粒度进行管理,消费组通过定义订阅关系控制指定消费组下的消费者如何实现消息过滤、消费重试及消费进度恢复等。
Apache RocketMQ 的订阅关系除过滤表达式之外都是持久化的,即服务端重启或请求断开,订阅关系依然保留。
消息的传输模型:
点对点模型:

  • 消费匿名:消息上下游沟通的唯一的身份就是队列,下游消费者从队列获取消息无法申明独立身份。
  • 一对一通信:基于消费匿名特点,下游消费者即使有多个,但都没有自己独立的身份,因此共享队列中的消息每一条消息都只会被唯一一个消费者处理。因此点对点模型只能实现一对一通信。
发布订阅模型:

  • 消费独立:相比队列模型的匿名消费方式,发布订阅模型中消费方都会具备的身份,一般叫做订阅组(订阅关系),不同订阅组之间相互独立不会相互影响。
  • 一对多通信:基于独立身份的设计,同一个主题内的消息可以被多个订阅组处理,每个订阅组都可以拿到全量消息。因此发布订阅模型可以实现一对多通信。
RocketMQ就是采用的发布订阅模型。
2.基本功能

①消息发送


  • 普通消息:创建主题的时候,Topic的消息类型要是普通消息【message.type=NORMAL】
发送消息时,建议设置业务上唯一的信息作为索引,方便后续快速定位消息。例如,订单ID,用户ID等。
6.png
  1. public static void sendSyncMessage() throws ClientException, IOException {
  2.         // 接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8080;xxx:8081。
  3.          String endpoint = "192.168.110.128:8081";
  4.         // 消息发送的目标Topic名称,需要提前创建。
  5.         String topic = Topic.DEMO_TOPIC;
  6.         ClientServiceProvider provider = ClientServiceProvider.loadService();
  7.         ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoint)
  8.                 .enableSsl(false);
  9.         ClientConfiguration configuration = builder.build();
  10.         // 初始化Producer时需要设置通信配置以及预绑定的Topic。
  11.         Producer producer = provider.newProducerBuilder()
  12.                 .setTopics(topic)
  13.                 .setClientConfiguration(configuration)
  14.                 .build();
  15.         // 普通消息发送。
  16.         Message message = provider.newMessageBuilder()
  17.                 .setTopic(topic)
  18.                 // 设置消息索引键,可根据关键字精确查找某条消息。
  19.                 .setKeys("messageKey")
  20.                 // 设置消息Tag,用于消费端根据指定Tag过滤消息。
  21.                 .setTag("messageTag")
  22.                 // 消息体。
  23.                 .setBody("我是消息体".getBytes())
  24.                 .build();
  25.         try {
  26.             // 发送消息,需要关注发送结果,并捕获失败等异常。
  27.             SendReceipt sendReceipt = producer.send(message);
  28.             System.out.println("Send message successfully, messageId=: "+  sendReceipt.getMessageId());
  29.         } catch (ClientException e) {
  30.             System.out.println("Failed to send message" + e);;
  31.         }
  32.          producer.close();
  33.     }
复制代码

  • 定时/延时消息:同理,注意Topic的消息类型。定时消息是 Apache RocketMQ 提供的一种高级消息类型,消息被发送至服务端后,在指定时间后才能被消费者消费。通过设置一定的定时时间可以实现分布式场景的延时调度触发效果。
典型场景:分布式定时调度任务超时处理
  1. public static void sendDelayMessage() throws ClientException, IOException {
  2.     // 接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8080;xxx:8081。
  3.     String endpoint = "192.168.110.128:8081";
  4.     // 消息发送的目标Topic名称,需要提前创建。
  5.     String topic = Topic.DELAY_TOPIC;
  6.     ClientServiceProvider provider = ClientServiceProvider.loadService();
  7.     ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoint)
  8.             .enableSsl(false);
  9.     ClientConfiguration configuration = builder.build();
  10.     // 初始化Producer时需要设置通信配置以及预绑定的Topic。
  11.     Producer producer = provider.newProducerBuilder()
  12.             .setTopics(topic)
  13.             .setClientConfiguration(configuration)
  14.             .build();
  15.     // 普通消息发送。
  16.     //以下示例表示:延迟时间为1分钟之后的Unix时间戳。
  17.     long deliverTimeStamp = System.currentTimeMillis() + 60 * 1000;
  18.     Message message = provider.newMessageBuilder()
  19.             .setTopic(topic)
  20.             // 设置消息索引键,可根据关键字精确查找某条消息。
  21.             .setKeys("delay-messageKey")
  22.             // 设置消息Tag,用于消费端根据指定Tag过滤消息。
  23.             .setTag("delay-messageTag")
  24.             .setDeliveryTimestamp(deliverTimeStamp)
  25.             // 消息体。
  26.             .setBody("我是延时消息消息体".getBytes())
  27.             .build();
  28.     try {
  29.         // 发送消息,需要关注发送结果,并捕获失败等异常。
  30.         SendReceipt sendReceipt = producer.send(message);
  31.         System.out.println("Send delay message ok, messageId=: "+  sendReceipt.getMessageId());
  32.     } catch (ClientException e) {
  33.         System.out.println("Failed to send message" + e);;
  34.     }
  35.     producer.close();
  36. }
复制代码

  • 顺序消息顺序消息是指在消息队列中,消息按照生产者发送的顺序被消费者接收和处理的一种消息类型。其核心目的是确保消息在逻辑关联性强的场景下,严格按照业务预期的顺序执行,避免因并发或异步处理导致的数据不一致问题。【和普通消息发送相比,顺序消息发送必须要设置消息组。】
普通消息消费者处理的顺序可能并不相同,以普通消息为例子:
  1. // 发送10条消息
  2. for (int i = 0; i < 10; i++) {
  3.     // 普通消息发送。
  4.     Message message = provider.newMessageBuilder()
  5.             .setTopic(topic)
  6.             // 设置消息索引键,可根据关键字精确查找某条消息。
  7.             .setKeys("messageKey" + i)
  8.             // 设置消息Tag,用于消费端根据指定Tag过滤消息。
  9.             .setTag("messageTag" + i )
  10.             // 消息体。
  11.             .setBody(("我是消息体 " + i).getBytes())
  12.             .build();
  13.     SendReceipt sendReceipt = producer.send(message);
  14.     System.out.println("Send message successfully, messageId=: "+  sendReceipt.getMessageId());
  15. }
  16. // 然后消费者来消费消息:
  17. 消费者1 【普通消息】 successfully, messageId=0100FF10576F2983B0084061DF00000000【】我是消息体 0
  18. 消费者1 【普通消息】 successfully, messageId=0100FF10576F2983B0084061DF00000002【】我是消息体 2
  19. 消费者1 【普通消息】 successfully, messageId=0100FF10576F2983B0084061DF00000004【】我是消息体 4
  20. 消费者1 【普通消息】 successfully, messageId=0100FF10576F2983B0084061DF00000007【】我是消息体 7
  21. 消费者1 【普通消息】 successfully, messageId=0100FF10576F2983B0084061DF00000005【】我是消息体 5
  22. 消费者1 【普通消息】 successfully, messageId=0100FF10576F2983B0084061DF00000008【】我是消息体 8
  23. 消费者1 【普通消息】 successfully, messageId=0100FF10576F2983B0084061DF00000009【】我是消息体 9
  24. 消费者1 【普通消息】 successfully, messageId=0100FF10576F2983B0084061DF00000001【】我是消息体 1
  25. 消费者1 【普通消息】 successfully, messageId=0100FF10576F2983B0084061DF00000006【】我是消息体 6
  26. 消费者1 【普通消息】 successfully, messageId=0100FF10576F2983B0084061DF00000003【】我是消息体 3
复制代码
可以看到消费者,消费消息的顺序是乱序的。在有些场景里面,以证券、股票交易撮合场景为例,对于出价相同的交易单,坚持按照先出价先交易的原则,下游处理订单的系统需要严格按照出价顺序来处理订单。
第二个场景,以数据库变更增量同步场景为例,上游源端数据库按需执行增删改操作,将二进制操作日志作为消息,通过 Apache RocketMQ 传输到下游搜索系统,下游系统按顺序还原消息数据,实现状态数据按序刷新。如果是普通消息则可能会导致状态混乱,和预期操作结果不符,基于顺序消息可以实现下游状态和上游操作结果一致。
示例:
  1. public static void sendOrderMessage() throws Exception {
  2.     // 接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8080;xxx:8081。
  3.     String endpoint = "192.168.110.128:8081";
  4.     // 消息发送的目标Topic名称,需要提前创建。
  5.     String topic = Topic.ORDER_TOPIC;
  6.     ClientServiceProvider provider = ClientServiceProvider.loadService();
  7.     ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoint)
  8.             .enableSsl(false);
  9.     ClientConfiguration configuration = builder.build();
  10.     // 初始化Producer时需要设置通信配置以及预绑定的Topic。
  11.     Producer producer = provider.newProducerBuilder()
  12.             .setTopics(topic)
  13.             .setClientConfiguration(configuration)
  14.             .build();
  15.     try {
  16.         // 发送消息,需要关注发送结果,并捕获失败等异常。
  17.         for (int i = 0; i < 10; i++) {
  18.             // 普通消息发送。
  19.             Message message = provider.newMessageBuilder()
  20.                     .setTopic(topic)
  21.                     // 设置消息索引键,可根据关键字精确查找某条消息。
  22.                     .setKeys("order-messageKey" + i)
  23.                     // 设置消息Tag,用于消费端根据指定Tag过滤消息。
  24.                     .setTag("order-messageTag" + i )
  25.                     //设置顺序消息的排序分组,该分组尽量保持离散,避免热点排序分组。
  26.                     .setMessageGroup("fifoGroup001") //v【和普通消息发送相比,顺序消息发送必须要设置消息组】
  27.                     // 消息体。
  28.                     .setBody(("我是顺序消息体 " + i).getBytes())
  29.                     .build();
  30.             SendReceipt sendReceipt = producer.send(message);
  31.             System.out.println("Send message successfully, messageId=: "+  sendReceipt.getMessageId());
  32.         }
  33.     } catch (ClientException e) {
  34.         System.out.println("Failed to send message" + e);;
  35.     }
  36.     producer.close();
  37. }
  38. // 消费者
  39. public static void simpleOrderConsumer() throws ClientException {
  40.     final ClientServiceProvider provider = ClientServiceProvider.loadService();
  41.     // 接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8081。
  42.     String endpoints = "192.168.110.128:8081";
  43.     ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
  44.             .setEndpoints(endpoints)
  45.             .build();
  46.     // 订阅消息的过滤规则,表示订阅所有Tag的消息。
  47.     String tag = "*";
  48.     FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
  49.     // 为消费者指定所属的消费者分组,Group需要提前创建。
  50.     String consumerGroup = "Order-Consumer-Group";
  51.     // 指定需要订阅哪个目标Topic,Topic需要提前创建。
  52.     String topic = Topic.ORDER_TOPIC;
  53.     // 初始化PushConsumer,需要绑定消费者分组ConsumerGroup、通信参数以及订阅关系。
  54.     PushConsumer pushConsumer = provider.newPushConsumerBuilder()
  55.             .setClientConfiguration(clientConfiguration)
  56.             // 设置消费者分组。
  57.             .setConsumerGroup(consumerGroup)
  58.             // 设置预绑定的订阅关系。
  59.             .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
  60.             // 设置消费监听器。
  61.             .setMessageListener(messageView -> {
  62.                 // 处理消息并返回消费结果。
  63.                 ByteBuffer body = messageView.getBody();
  64.                 int remaining = body.remaining();
  65.                 byte[] content = new byte[remaining];
  66.                 body.get(content);
  67.                 String str = new String(content);
  68.                 System.out.println("消费者1 【顺序消息】 successfully, messageId=" + messageView.getMessageId() + "【】" +str);
  69.                 return ConsumeResult.SUCCESS;
  70.             })
  71.             .build();
  72.     try {
  73.         Thread.sleep(30_000);
  74.     } catch (InterruptedException e) {
  75.         throw new RuntimeException(e);
  76.     }
  77.     try {
  78.         // 如果不需要再使用 PushConsumer,可关闭该实例。
  79.         pushConsumer.close();
  80.     } catch (IOException e) {
  81.         throw new RuntimeException(e);
  82.     }
  83. }
  84. 消费者1 【顺序消息】 successfully, messageId=0100FF10576F2973F40840624B00000000【】我是顺序消息体 0
  85. 消费者1 【顺序消息】 successfully, messageId=0100FF10576F2973F40840624B00000001【】我是顺序消息体 1
  86. 消费者1 【顺序消息】 successfully, messageId=0100FF10576F2973F40840624B00000002【】我是顺序消息体 2
  87. 消费者1 【顺序消息】 successfully, messageId=0100FF10576F2973F40840624B00000003【】我是顺序消息体 3
  88. 消费者1 【顺序消息】 successfully, messageId=0100FF10576F2973F40840624B00000004【】我是顺序消息体 4
  89. 消费者1 【顺序消息】 successfully, messageId=0100FF10576F2973F40840624B00000005【】我是顺序消息体 5
  90. 消费者1 【顺序消息】 successfully, messageId=0100FF10576F2973F40840624B00000006【】我是顺序消息体 6
  91. 消费者1 【顺序消息】 successfully, messageId=0100FF10576F2973F40840624B00000007【】我是顺序消息体 7
  92. 消费者1 【顺序消息】 successfully, messageId=0100FF10576F2973F40840624B00000008【】我是顺序消息体 8
  93. 消费者1 【顺序消息】 successfully, messageId=0100FF10576F2973F40840624B00000009【】我是顺序消息体 9
复制代码
可以看到,严格有序的。
顺序消息原理:顺序消息的顺序关系通过消息组(MessageGroup)判定和识别,发送顺序消息时需要为每条消息设置归属的消息组,相同消息组的多条消息之间遵循先进先出的顺序关系,不同消息组、无消息组的消息之间不涉及顺序性。
要保证顺序性,需要做到以下几点:

  • 生产顺序性:单一生产者:消息生产的顺序性仅支持单一生产者,不同生产者分布在不同的系统,即使设置相同的消息组,不同生产者之间产生的消息也无法判定其先后顺序。串行发送:Apache RocketMQ 生产者客户端支持多线程安全访问,但如果生产者使用多线程并行发送,则不同线程间产生的消息将无法判定其先后顺序。满足以上条件的生产者,将顺序消息发送至 Apache RocketMQ 后,会保证设置了同一消息组的消息,按照发送顺序存储在同一队列中,相同消息组的消息按照先后顺序被存储在同一个队列。不同消息组的消息可以混合在同一个队列中,且不保证连续。
  • 消费顺序性:投递顺序,RocketMQ 通过客户端SDK和服务端通信协议保障消息按照服务端存储顺序投递,但业务方消费消息时需要严格按照接收---处理---应答的语义处理消息,避免因异步处理导致消息乱序;有限重试,RocketMQ 顺序消息投递仅在重试次数限定范围内,即一条消息如果一直重试失败,超过最大重试次数后将不再重试,跳过这条消息消费,不会一直阻塞后续消息处理,对于需要严格保证消费顺序的场景,请务设置合理的重试次数,避免参数不合理导致消息乱序。
+++

  • 事务消息:这个就涉及到分布式事务了,读者们先见官网吧:https://rocketmq.apache.org/zh/docs/featureBehavior/04transactionmessage,这个分布式事务,我认为是分布式系统中的最难问题了。本文仅作简单探讨。
②消息(可靠性保证&消费)

在分布式系统中,消息中间件的可靠性是保障业务数据一致性和系统健壮性的关键。RocketMQ 通过多层次的机制确保消息从生产者发送Broker存储再到消费者消费的整个生命周期中不丢失、不重复且有序传递。

  • 生产者端消息发送可靠性保证:内置消息发送重试机制
触发消息发送重试机制的条件如下:客户端消息发送请求调用失败或请求超时网络异常造成连接失败或请求超时;服务端节点处于重启或下线等状态造成连接失败;服务端运行慢造成请求超时;服务端返回失败错误码,系统逻辑错误:因运行逻辑不正确造成的错误,系统流控错误【这个重试有点不一样,见官网】:因容量超限造成的流控错误。
  1. 1.同步发送模式
  2. 生产者发送消息后阻塞等待Broker的确认响应(SendResult),若未收到ACK或收到失败响应,触发重试机制。
  3. 【这个是最可靠的方式。生产者在发送消息后,会等待 Broker 的确认(ack),确认消息成功到达 Broker 后,生产者才会认为发送成功。】
  4. SendResult sendResult = producer.send(msg);
  5. if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
  6.     System.out.println("消息发送成功");
  7. }
  8. 确保消息成功到达Broker,避免异步发送因网络抖动导致的消息丢失。
  9.    
  10. 在构建生产者的时候,
  11. Producer producer = provider.newProducerBuilder()
  12.         .setTopics(topic)
  13.         .setMaxAttempts(3) // 重试次数默认是3次
  14.         .setClientConfiguration(configuration)
  15.         .build();
  16. 2.异步发送模式
  17. 异步发送:调用线程不会阻塞,但调用结果会通过异常事件或者成功事件返回。
  18. CompletableFuture<SendReceipt> future = producer.sendAsync(message);
  19. future.whenCompleteAsync((sendReceipt1, throwable) -> {
  20.     if (throwable != null) {
  21.         // 消息发送失败,原因等异常。
  22.         System.out.println("Failed to send message");
  23.     } else {
  24.         // 消息发送成功。
  25.         System.out.println("Send message successfully, messageId=: "+  sendReceipt1.getMessageId());
  26.     }
  27. });
复制代码

  • Broker端消息可靠性保证
  1. 1.消息持久化
  2. 刷盘策略:
  3. - 同步刷盘(FlushDiskType.SYNC_FLUSH):消息写入内存后立即刷盘,保证宕机不丢数据,但性能较低。
  4. - 异步刷盘(FlushDiskType.ASYNC_FLUSH):定期批量刷盘,性能高但存在数据丢失风险(默认配置)。
  5. 配置建议:金融级场景建议同步刷盘,一般场景异步刷盘+主从复制。
  6.    
  7. 2.主从复制
  8. 复制模式:
  9. - 同步复制:消息写入主节点后需同步到从节点才返回成功,保证数据强一致。
  10. - 异步复制:主节点写入成功即返回,从节点异步复制(性能更高,默认模式)。
  11.    
  12. broker角色配置:
  13. brokerRole        默认ASYNC_MASTER        可选值:SYNC_MASTER,ASYNC_MASTER,SLAVE
  14. broker刷盘类型:
  15. flushDiskType  默认ASYNC_FLUSH        可选值:SYNC_FLUSH,ASYNC_FLUSH
  16. SYNC_FLUSH 模式下的 broker 保证在收到确认生产者之前将消息刷盘。
  17. ASYNC_FLUSH 模式下的 broker 则利用刷盘一组消息的模式,可以取得更好的性能。
复制代码

  • 消费者端消息可靠性保证
推荐使用消息重试场景如下:

  • 业务处理失败,且失败原因跟当前的消息内容相关,比如该消息对应的事务状态还未获取到,预期一段时间后可执行成功。
  • 消费失败的原因不会导致连续性,即当前消息消费失败是一个小概率事件,不是常态化的失败,后面的消息大概率会消费成功。此时可以对当前消息进行重试,避免进程阻塞。
消费者可靠性见第三章吧。
3.实操及相关分析

本节就使用Springboot2.7.18 整合 RocketMQ,来模拟平时的一些场景。
3.1 订单超时关闭

这个场景用到它还是比较合适的,就用他的延时消息,官网也说到了这个场景的:以电商交易场景为例,订单下单后暂未支付,此时不可以直接关闭订单,而是需要等待一段时间后才能关闭订单。使用 Apache RocketMQ 定时消息可以实现超时任务的检查触发。
我们先创建延时相关的主题,然后见如下代码示例:
  1. @RestController
  2. public class OrderExpireController {
  3.     @Resource
  4.     private OrderExpireProducer orderExpireProducer;
  5.     @PostMapping("/orderSubmit")
  6.     public R orderSubmit(String orderId) {
  7.         // 模拟订单超时关闭
  8.         // 1.mq发送一条延迟消息,延迟时间60秒
  9.         orderExpireProducer.sendOrderExpireMessage(orderId, 60);
  10.         return R.success().setData("msg", "订单提交成功,请耐心等待");
  11.     }
  12. }
  13. // 生产者-----------------
  14. @Service
  15. @Slf4j
  16. public class OrderExpireProducer {
  17.     @Resource
  18.     private RocketMQTemplate rocketMQTemplate;
  19.     public void sendOrderExpireMessage(String message, long delayTime) {
  20.         log.info("订单延迟消息发送,订单ID:{},延迟时间:{}", message, delayTime);
  21.         message = "[订单号]" + message + "|" + delayTime;
  22.         SendResult sendResult = null;
  23.         int retryCount = 0;
  24.         // 这里是模拟重试!!!要注意!!!
  25.         do {
  26.             try {
  27.                 sendResult = rocketMQTemplate.syncSendDelayTimeSeconds(TopicConstant.ORDER_EXPIRE_TOPIC, message, delayTime);
  28.                 //sendResult = rocketMQTemplate.syncSendDelayTimeSeconds(TopicConstant.ORDER_EXPIRE_TOPIC, message, 0); // 模拟错误
  29.             } catch (Exception e) {
  30.                 log.error("订单延迟消息发送出现异常,订单ID:{},错误信息:{}", message, e.getMessage());
  31.             }
  32.             if ( sendResult != null && sendResult.getSendStatus() == SendStatus.SEND_OK ) {
  33.                 log.info("订单延迟消息发送成功,订单ID:{}", message); break;
  34.             } else {
  35.                 log.error("订单延迟消息发送失败,订单ID:{}", message);
  36.                 // 可以考虑将此消息存起来,或者重试.....
  37.             }
  38.             retryCount++;
  39.         } while (sendResult == null && retryCount < 3);
  40.     }
  41. }
  42. // 消费者----------------------
  43. @Service
  44. @Slf4j
  45. @RocketMQMessageListener(
  46.         topic = TopicConstant.ORDER_EXPIRE_TOPIC,
  47.         consumerGroup = ConsumerGroup.ORDER_EXPIRE_GROUP,
  48.        /*
  49.         默认集群消费模式:当使用集群消费模式时,RocketMQ 认为任意一条消息只需要被消费组内的任意一个消费者处理即可。
  50.         广播消费模式:当使用广播消费模式时,RocketMQ 会将每条消息推送给消费组所有的消费者,保证消息至少被每个消费者消费一次。
  51.         */
  52.         messageModel = MessageModel.CLUSTERING
  53. )
  54. public class OrderExpireConsumer implements RocketMQListener<String> {
  55.     public void onMessage(String message) {
  56.         log.info("【订单过期消息消费者】收到消息:{}", message);
  57.         log.info("【订单过期消息消费者】处理订单消息开始......修改订单状态..等等操作");
  58.         // 处理订单过期消息.....
  59.     }
  60. }
  61. 2025-06-27 15:09:42.575  INFO 5192 --- [nio-8900-exec-1] c.f.m.r.producer1.OrderExpireProducer    : 订单延迟消息发送,订单ID:123456789,延迟时间:60
  62. 2025-06-27 15:09:42.664  INFO 5192 --- [nio-8900-exec-1] c.f.m.r.producer1.OrderExpireProducer    : 订单延迟消息发送成功,订单ID:[订单号]123456789|60
  63. 2025-06-27 15:10:44.026  INFO 5192 --- [-expire-group_1] c.f.m.r.consumer1.OrderExpireConsumer    : 【订单过期消息消费者】收到消息:[订单号]123456789|60
  64. 2025-06-27 15:10:44.026  INFO 5192 --- [-expire-group_1] c.f.m.r.consumer1.OrderExpireConsumer    : 【订单过期消息消费者】处理订单消息开始......修改订单状态..等等操作
复制代码
现在有一个疑问,那就是如果只有一个消费者,我现在发送了两条延时消息,会导致第二条消息处理的时间往后延五秒钟吗?
看下面的实验结果:【只有一个消费者的时候,发送两条】
7.png

第二条消息是05秒发出的,按照我们的想法,应该是25 + 5 = 30秒被收到,但是实际结果没有延迟5秒,消息发送到处理,这个过程两条消息间隔差不多都是20秒。说明是多线程消费的!
8.png

看一下RocketMQMessageListener注解里面的参数,我们会发现一些端倪。当未显式设置consumeMode时,默认使用ConsumeMode.CONCURRENTLY(并发消费)。
虽然是多线程的了,但是它还是会有最大线程数,一个人的能力毕竟是有限的。我们可以新增消费者2,监听同一个主题,在同一个消费者组里面。新增消费者实例会触发RocketMQ的Rebalance机制,系统会重新分配Topic的MessageQueue到所有消费者实例。假设原Topic有4个队列【创建Topic的时候默认是八个队列】,原消费者组有1个实例 → 新增1个实例后,队列分配可能变为:
消费者1 → 队列0、1   消费者2 → 队列2、3【每个队列仅被一个消费者实例处理,确保消息不重复消费】
但是我们要注意一个问题:

  • 若Topic队列数 ≥ 消费者实例数,新增实例会直接提升并行处理能力
  • 若Topic队列数 < 消费者实例数,多余实例将处于空闲状态(无队列分配)
倘若我们这样设置,那么就是单线程处理消息了:
  1. @Service
  2. @Slf4j
  3. @RocketMQMessageListener(
  4.         topic = TopicConstant.ORDER_EXPIRE_TOPIC,
  5.         consumerGroup = ConsumerGroup.ORDER_EXPIRE_GROUP,
  6.         messageModel = MessageModel.CLUSTERING,
  7.         consumeThreadMax = 1,  // 设置这个
  8.         consumeThreadNumber = 1 // 设置这个
  9. )
  10. public class OrderExpireConsumer implements RocketMQListener<String> {
  11.     public void onMessage(String message) {
  12.         log.info("【===============================】");
  13.         log.info("【订单过期消息消费者】收到消息:{}", message);
  14.         log.info("【订单过期消息消费者】处理订单消息开始......修改订单状态..等等操作");
  15.         try {
  16.             Thread.sleep(5000);
  17.         } catch (InterruptedException e) {
  18.             throw new RuntimeException(e);
  19.         }
  20.         log.info("【===============================】");
  21.         // 处理订单过期消息.....
  22.     }
  23. }
复制代码
SpringBoot的RocketMQTemplate是有重试机制的,所以我们上面只是模拟重试的:
  1. // DefaultMQProducerImpl.java
  2. private SendResult sendDefaultImpl(
  3.     Message msg,
  4.     final CommunicationMode communicationMode,
  5.     final SendCallback sendCallback,
  6.     final long timeout
  7. ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
  8.     ...
  9.     // 重试
  10.     int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
  11.     for (; times < timesTotal; times++) {
  12.         .......
  13.     }
  14. }
复制代码
3.2 消费消息

模拟这样一个场景,用户下单后,系统需调用库存服务扣减库存。若库存服务暂时不可用(如网络抖动),需自动重试;若多次重试仍失败,则转人工处理。
核心流程:订单服务发送订单消息到 order-topic,库存消费者尝试扣减库存,失败时触发重试,重试3次后仍失败,消息进入死信队列,死信队列消费者记录日志并发送告警。【注意哦,RocketMQ也是有死信队列的喔】
@RocketMQMessageListener的形式:配置简单,参数固定。
  1. // 发送消息
  2. public void sendReliabilityOrderTest( String orderId ) {
  3.     SendResult sendResult = rocketMQTemplate.syncSend(TopicConstant.CONSUMER_RELIABILITY_TOPIC, MessageBuilder.withPayload(orderId).build());
  4.     if ( sendResult.getSendStatus() == SendStatus.SEND_OK )
  5.         System.out.printf("【消费者可靠性测试】同步发送ok结果: %s\n", orderId);
  6.     else
  7.         System.out.printf("【消费者可靠性测试】同步发送失败: %s\n", orderId);
  8. }
  9. // 扣减库存的消费者
  10. @Service
  11. @Slf4j
  12. @RocketMQMessageListener(
  13.     topic = TopicConstant.CONSUMER_RELIABILITY_TOPIC,
  14.     consumerGroup = ConsumerGroup.CONSUMER_RELIABILITY,
  15.     messageModel = MessageModel.CLUSTERING,
  16.     maxReconsumeTimes = 2, // 最大重试次数
  17.     suspendCurrentQueueTimeMillis = 1500 // 暂停时间
  18. )
  19. public class ReliabilityConsumer implements RocketMQListener<String> {
  20.     @Override
  21.     public void onMessage(String s) {
  22.         log.info("【处理订单消息-准备扣减库存========================】");
  23.         boolean res = handleMessage(s); // 这里要抛异常出去----------
  24.     }
  25.     private boolean handleMessage(String message) {
  26.         // 随机生成true或者false
  27.         Random random = new Random();
  28.         // boolean flag = random.nextBoolean();
  29.         boolean flag = false;
  30.         log.info("处理消息:{}", message);
  31.         if ( !flag ) {
  32.             log.error("处理消息失败--出现了意想不到的异常");
  33.             throw new RuntimeException("处理消息失败");
  34.         }
  35.         // 处理消息
  36.         return true;
  37.     }
  38. }
复制代码
按照上面的写法,会自动重试喔,重试之后,我们会发现,dashboard里面出现了新的死信topic:%DLQ%reliability-order-group,它是以%DLQ%字符串开头的。但是它的topic后缀是那个处理消息失败的消费者组名称。
  1. // 死信消费
  2. @Service
  3. @Slf4j
  4. @RocketMQMessageListener(
  5.     topic = "%DLQ%" + ConsumerGroup.CONSUMER_RELIABILITY, // 死信TOPIC,注意他的组成
  6.     consumerGroup = ConsumerGroup.CONSUMER_RELIABILITY_DLQ, // 死信队列消费组
  7.     messageModel = MessageModel.CLUSTERING
  8. )
  9. public class ReliabilityDLQConsumer implements RocketMQListener<String> {
  10.     @Override
  11.     public void onMessage(String s) {
  12.         log.info("【死信队列消费者】收到消息:{}", s);
  13.         // 通知
  14.         log.info("【死信队列消费者】处理死信消息开始......通知人工处理~~~~");
  15.         // 存储该死信
  16.         log.info("【死信队列消费者】处理死信消息结束......存储数据..");
  17.     }
  18. }
复制代码
从官网我们得知:消费重试指的是,消费者在消费某条消息失败后,RocketMQ 服务端会根据重试策略重新消费该消息,超过一定次数后若还未消费成功,则该消息将不再继续重试,直接被发送到死信队列中。
会在如下情况触发重试:

  • 消费失败,包括消费者返回消息失败状态标识或抛出非预期异常。
  • 消息处理超时,包括在PushConsumer中排队超时。
看到这里,相信读者们肯定有这样一个想法,这个注解好神奇啊,我们只需要按照约定,配置一下逻辑,一个消费者就完成了,包括其自动重试机制也有,那么,这个原理是什么呢?本文制作简单引导,详情见后续文章。
熟悉SpringBoot自动配置原理的同学肯定知道,我们导入了rocketmq-spring-boot-starter这样一个依赖,我们去类路径下找一下:
9.png
  1. org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
  2. org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration
复制代码
导入了其自动配置:RocketMQAutoConfiguration
  1. @Configuration
  2. @EnableConfigurationProperties(RocketMQProperties.class)
  3. @ConditionalOnClass({MQAdmin.class})
  4. @ConditionalOnProperty(prefix = "rocketmq", value = "name-server", matchIfMissing = true)
  5. // 导入了这个`ListenerContainerConfiguration`
  6. @Import({MessageConverterConfiguration.class, ListenerContainerConfiguration.class, ExtProducerResetConfiguration.class,
  7.         ExtConsumerResetConfiguration.class, RocketMQTransactionConfiguration.class, RocketMQListenerConfiguration.class})
  8. @AutoConfigureAfter({MessageConverterConfiguration.class})
  9. @AutoConfigureBefore({RocketMQTransactionConfiguration.class})
  10. public class RocketMQAutoConfiguration implements ApplicationContextAware {
  11.     ....
  12. }
  13. @Configuration
  14. @ConditionalOnMissingBean(RocketMQMessageListenerContainerRegistrar.class)
  15. public class ListenerContainerConfiguration {
  16.     // 创建了这样一个bean
  17.     @Bean
  18.     public RocketMQMessageListenerContainerRegistrar rocketMQMessageListenerContainerRegistrar(RocketMQMessageConverter rocketMQMessageConverter, ConfigurableEnvironment environment, RocketMQProperties rocketMQProperties) {
  19.         return new RocketMQMessageListenerContainerRegistrar(rocketMQMessageConverter, environment, rocketMQProperties);
  20.     }
  21. }
  22. // 实现了这样一个ApplicationContextAware接口
  23. public class RocketMQMessageListenerContainerRegistrar implements ApplicationContextAware {
  24.     ....
  25.     // 在下面的BeanPostProcessor里面被调用了,识别注解进行解析
  26.     public void registerContainer(String beanName, Object bean, RocketMQMessageListener annotation) {
  27.         .....
  28.     }
  29. }
复制代码
同时呢,
  1. @Configuration
  2. @AutoConfigureAfter(RocketMQAutoConfiguration.class)
  3. public class RocketMQListenerConfiguration implements ImportBeanDefinitionRegistrar {
  4.         // 注册了这样一个bean定义,所以容器就会有一个这样的bean:RocketMQMessageListenerBeanPostProcessor
  5.     @Override
  6.     public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
  7.         if (!registry.containsBeanDefinition(RocketMQMessageListenerBeanPostProcessor.class.getName())) {
  8.             registry.registerBeanDefinition(RocketMQMessageListenerBeanPostProcessor.class.getName(),
  9.                     new RootBeanDefinition(RocketMQMessageListenerBeanPostProcessor.class));
  10.         }
  11.     }
  12. }
  13. // RocketMQMessageListenerBeanPostProcessor.java
  14. @Override
  15. public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
  16.     Class<?> targetClass = AopUtils.getTargetClass(bean);
  17.     RocketMQMessageListener ann = targetClass.getAnnotation(RocketMQMessageListener.class);
  18.     if (ann != null) {
  19.         RocketMQMessageListener enhance = enhance(targetClass, ann);
  20.         if (listenerContainerRegistrar != null) {
  21.             // 这里调用registerContainer方法
  22.             listenerContainerRegistrar.registerContainer方法(beanName, bean, enhance);
  23.         }
  24.     }
  25.     return bean;
  26. }
  27. // 在上面注册好相关的bean之后,然后通过Lifecycle接口的start
  28. // 【在Spring容器创建过程中,创建好bean之后,在onRefresh方法里面】,在里面启动消费者
  29. @Override
  30. public void start() {
  31.     ....
  32.     try {
  33.         // DefaultMQPushConsumer.start---本质是这个
  34.         consumer.start();
  35.     } catch (MQClientException e) {
  36.         throw new IllegalStateException("Failed to start RocketMQ push consumer", e);
  37.     }
  38.     this.setRunning(true);
  39.     log.info("running container: {}", this.toString());
  40. }
复制代码
大致过程就是这样,具体过程就留给读者自行阅读源码了。【需要读者了解spring创建初始化bean的完整流程,及其扩展点】-- 据说这个是Spring面试的重点
所以说,spring整合的时候除了注解的形式,我们还可以使用下面的形式:
  1. /**
  2. * @version 1.0
  3. * @Author txf
  4. * @Date 2025/6/30 14:48
  5. * @注释 @Bean的形式创建消费者
  6. */
  7. @Component
  8. @Slf4j
  9. public class BeanConsumer {
  10.     @Value("${rocketmq.name-server}")
  11.     private String nameServer;
  12.     @Bean
  13.     public DefaultMQPushConsumer pushConsumer() throws MQClientException {
  14.         DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(ConsumerGroup.CONSUMER_RELIABILITY);
  15.         consumer.setNamesrvAddr(nameServer);
  16.         consumer.subscribe(TopicConstant.CONSUMER_RELIABILITY_TOPIC, "*");
  17.         // 配置重试参数
  18.         consumer.setMaxReconsumeTimes(3);  // 最多重试3次
  19.         consumer.setSuspendCurrentQueueTimeMillis(10000);  // 重试间隔10秒
  20.         consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
  21.             MessageExt messageExt = msgs.get(0); // 这里模拟方便,只取了第一个
  22.             String msg = new String(messageExt.getBody());
  23.             log.info("【===============================】 {}", msg);
  24.             try {
  25.                 // 业务逻辑....
  26.                 Random random = new Random();
  27.                 boolean b = random.nextBoolean();
  28.                 if (b) return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  29.                 throw new Exception("消费失败");
  30.             } catch (Exception e) {
  31.                 log.error("消费失败--{}", e.getMessage());
  32.                 // 记录重试次数
  33.                 int reconsumeTimes = msgs.get(0).getReconsumeTimes();
  34.                 log.error("消费失败,重试次数:{}", reconsumeTimes, e);
  35.                 // 达到最大重试次数后转死信队列
  36.                 if (reconsumeTimes >= 2) {  // 已重试2次,本次是第3次
  37.                     // 发送到死信队列:sendToDLQ(msgs.get(0));
  38.                     return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  39.                 }
  40.                 return ConsumeConcurrentlyStatus.RECONSUME_LATER; // 触发重试
  41.             }
  42.         });
  43.         consumer.start();
  44.         return consumer;
  45.     }
  46. }
复制代码
3.3 消息幂等性

RocketMQ 保证消息至少送达一次,这意味着在网络不稳定、客户端重启、负载均衡重平衡等情况下,消息极有可能被重复消费。确保消息消费的幂等性是构建可靠消息系统的关键要求。我们的核心思路就是: 幂等性保证的责任主要在消费者端。消费者需要设计自己的业务逻辑,使得无论同一条消息被消费多少次,最终的业务结果都是一致的。
现在我们复现一下重复消费的情况:
  1. // 消息生产者
  2. @Service
  3. @Slf4j
  4. public class IdempotentProducer {
  5.     @Resource
  6.     private RocketMQTemplate rocketMQTemplate;
  7.     // 发送一个普通消息--topic是 idempotent-topic
  8.     public void sendNormalMessage(String message){
  9.         Message<String> msg = MessageBuilder.withPayload(message)
  10.                 // 发送消息的时候附带一个唯一标识 -- 这里仅是举个例子,实际使用中请自行处理
  11.                 // 需确保全局唯一性,推荐使用业务流水号(如订单号+时间戳),这里用UUID作为例子。
  12.                 .setHeader("msgId", UUID.randomUUID().toString().substring(0, 8))
  13.                 .build();
  14.         rocketMQTemplate.asyncSend(TopicConstant.IDEMPOTENT_TOPIC, msg, new SendCallback() {
  15.             @Override
  16.             public void onSuccess(SendResult sendResult) {
  17.                 log.info("发送成功: {}", sendResult);
  18.             }
  19.             @Override
  20.             public void onException(Throwable e) {
  21.                 log.error("发送失败: {}", e.getMessage());
  22.             }
  23.         });
  24.     }
  25. }
  26. // 消息消费者
  27. @Service
  28. @Slf4j
  29. @RocketMQMessageListener(
  30.         topic = TopicConstant.IDEMPOTENT_TOPIC,
  31.         consumerGroup = ConsumerGroup.IDEMPOTENT_GROUP,
  32.         maxReconsumeTimes = 1 // 这里重试一次
  33. )
  34. public class IdempotentConsumer implements RocketMQListener<MessageExt> {
  35.     @Override
  36.     public void onMessage(MessageExt message) {
  37.         String body = new String(message.getBody());
  38.         handleMessage(body);
  39.     }
  40.     private void handleMessage(String body) {
  41.         // 处理消息....
  42.         log.info("【幂等性消费者】处理消息中....: {}", body);
  43.         try {
  44.             TimeUnit.SECONDS.sleep(20); // 模拟处理业务20秒
  45.         } catch (InterruptedException e) {
  46.             throw new RuntimeException(e);
  47.         }
  48.         log.info("【幂等性消费者】处理消息完成, 入库咯~~~");
  49.         throw new RuntimeException("出现异常"); // 触发重试
  50.     }
  51. }
复制代码
上面模拟会出现如下情况:
10.png

如果没有做好幂等性,真实业务可能会一次操作,记录两次结果,假设是在扣库存方面,就会一次下单,然后扣减了两边甚至是多次库存,数据就会产生不一致。
幂等性主要从消费者端来处理:
  1. public class IdempotentConsumer implements RocketMQListener<MessageExt> {
  2.     private final Set<String> msgIds = new HashSet<>(128);
  3.     @Override
  4.     public void onMessage(MessageExt message) {
  5.         String msgId = message.getProperty("msgId"); // 自定义的
  6.         // 消息在Broker中的唯一ID,全局唯一,看起来适合做幂等,但是 msgId 是系统生成的,
  7.         // 与业务含义无关。无法直接关联到具体的业务操作实例。业务系统通常需要自己的业务唯一标识。
  8.         // String msgId1 = message.getMsgId();
  9.         String body = new String(message.getBody());
  10.         // 判断消息的唯一性
  11.         if ( judgeUnique(msgId) ) {
  12.             handleMessage(msgId, body);
  13.         }else {
  14.             log.info("消息重复,请勿重复处理");
  15.         }
  16.     }
  17.     private void handleMessage(String msgId, String body) {
  18.         // 处理消息....
  19.         log.info("【幂等性消费者】处理消息: {}", body);
  20.         try {
  21.             TimeUnit.SECONDS.sleep(20); // 模拟处理业务20秒
  22.             // 该消息处理成功之后,msgId可以标识为已处理了
  23.             log.info("【幂等性消费者】处理消息完成, 入库咯~~~");
  24.             msgIds.add(msgId);
  25.             /*
  26.             * if ( ok ) msgIds.add(msgId); // 如果处理是ok的
  27.             * else msgIds.remove(msgId); // 反之,移除掉
  28.             * */
  29.         } catch (InterruptedException e) {
  30.             log.info("处理消息异常", e);
  31.             // msgIds.remove(msgId);
  32.         }
  33.          // 这里再次模拟重试的话,消息处理完成之后,只会被处理一次了
  34.         throw new RuntimeException("出现异常");
  35.     }
  36.     private boolean judgeUnique(String msgId) {
  37.         // 这里就仅仅用一个Set来判断消息的唯一性
  38.         return msgIds.add(msgId);
  39.         /*
  40.         方式二:可以结合中间件Redis来判断
  41.         使用Redis的SETNX命令或Redisson实现分布式锁。消费者获取锁后处理消息,完成后释放锁。
  42.         但是要注意内存问题,如果key过多,可能会内存溢出,建议根据实际情况定期清理key
  43.         */
  44.         
  45.         // 方式三:使用数据库:【唯一约束等等】
  46.     }
  47. }
复制代码
11.png

具体什么方案还是看具体业务的,上面的幂等性处理是消息的唯一键标识。在实际过程中,我们还可以给消息处理加上一个状态,首先为每条消息或对应的业务操作分配一个全局唯一的业务标识(如订单ID、交易流水号、业务单据号等),在消费消息的时候,先去一个持久化存储(通常是数据库)中检查这个唯一标识对应的业务状态,基于当前状态和业务规则,决定是否执行以及如何执行操作。
【生产者端】:为需要保证幂等性的业务操作生成一个全局唯一的标识(uniqueKey),并放入消息的 properties 或消息体中。这个标识必须能唯一确定一个业务操作实例
【消费者端】:从消息中解析出 uniqueKey,先看这个uniqueKey是否存在过了,如果记录不存在:说明是第一次处理,正常执行业务逻辑(如创建订单、扣减库存等),并在操作成功后,在同一个数据库事务中将业务记录和状态写入数据库。如果存在的话,并且它的状态是已完成/已成功:直接丢弃消息或记录日志(幂等成功);如果状态是失败:根据业务规则决定是重试还是标记为最终失败;如果状态是处理中,可以选择等待一段时间重试检查,因为处理中的消息可能会失败喔。最后需要注意的是,检查状态、执行业务操作、更新状态到数据库,这三个步骤最好在一个数据库事务中完成,为了防止并发重复消费嘛。
上述过程如下图所示:
12.png
上面仅供参考,实际处理需要读者自行根据业务判断,其中Redis并不是必须要引入的。上图引入Redis仅仅是为了让处理成功的消息更快被找到,仅此而已。
end.参考


  • RocketMQ官网 https://rocketmq.apache.org/zh/docs/
  • https://blog.csdn.net/ctwctw/article/details/107463884

来源:豆瓜网用户自行投稿发布,如果侵权,请联系站长删除

相关推荐

您需要登录后才可以回帖 登录 | 立即注册