找回密码
 立即注册
首页 业界区 安全 Redis实现消息队列

Redis实现消息队列

宛蛲 昨天 14:09
Redis 凭借其高性能、低延迟和丰富的数据结构,常被用来实现轻量级消息队列。
1、List实现简单队列

List 是 Redis 最基础的消息队列实现方式,利用其 有序、可重复 的特性,通过 LPUSH(生产者推送)和 BRPOP(消费者拉取)实现消息传递。
原理

  • 生产者:使用 LPUSH 将消息从列表左侧插入。
  • 消费者:使用 BRPOP 阻塞地从列表右侧取出消息。BRPOP 会在列表为空时自动阻塞连接,直到有新消息到来或超时,避免了消费者无效的轮询。
示例代码
  1. import redis
  2. import threading
  3. import time
  4. # 连接 Redis
  5. r = redis.Redis(host='localhost', port=6379, db=0)
  6. QUEUE_KEY = 'message_queue'
  7. # 生产者
  8. def producer():
  9.     for i in range(5):
  10.         message = f"消息{i}"
  11.         r.lpush(QUEUE_KEY, message)
  12.         print(f"生产者发送: {message}")
  13.         time.sleep(1)  # 模拟生产延迟
  14. # 消费者
  15. def consumer():
  16.     while True:
  17.         # 阻塞式获取消息,超时时间 0(永久等待)
  18.         _, message = r.brpop(QUEUE_KEY, timeout=0)
  19.         print(f"消费者接收: {message.decode()}")
  20.         time.sleep(0.5)  # 模拟处理延迟
  21. # 启动生产者和消费者
  22. if __name__ == "__main__":
  23.     t1 = threading.Thread(target=producer)
  24.     t2 = threading.Thread(target=consumer)
  25.     t1.start()
  26.     t2.start()
  27.     t1.join()
  28.     t2.join()
复制代码
优点:
实现简单,性能高,利用 BRPOP 避免了轮询。
缺点:

  • 无消息确认(ACK)机制:一旦 BRPOP 取出消息,消息就从队列中删除了。如果消费者在处理消息过程中崩溃,这条消息将永久丢失。
  • 无重试机制:无法处理消费失败的消息。
  • 单消费者:虽然多个消费者可以同时 BRPOP,但一条消息只会被一个消费者获取。这是优点也是缺点,取决于你的需求。
适用场景:仅适用于对消息可靠性要求极低的场景,比如丢一两条消息也无所谓。
2、Pub/Sub 实现广播队列

Pub/Sub(发布 - 订阅)是一种广播模式,生产者发布消息到指定 “频道”,所有订阅该频道的消费者都能收到消息。
原理

  • 生产者:向指定的频道(Channel)PUBLISH 消息。
  • 消费者:SUBSCRIBE 一个或多个频道,即可收到发布到该频道的所有消息。
示例代码
  1. import redis
  2. import time
  3. import threading
  4. r = redis.Redis(host='localhost', port=6379, db=0)
  5. CHANNEL_NAME = 'my:news:channel'
  6. def publisher():
  7.     """ 消息发布者 """
  8.     for i in range(3):
  9.         message = f"News Update #{i}"
  10.         # 发布消息到频道
  11.         r.publish(CHANNEL_NAME, message)
  12.         print(f"Published: {message}")
  13.         time.sleep(1)
  14.     print("Publisher finished.")
  15. def subscriber(subscriber_id):
  16.     """ 消息订阅者 """
  17.     print(f"Subscriber {subscriber_id} started...")
  18.     # 创建独立的连接用于订阅(重要!订阅会阻塞连接)
  19.     pubsub = r.pubsub()
  20.     pubsub.subscribe(CHANNEL_NAME) # 订阅频道
  21.     # 监听消息
  22.     for message in pubsub.listen():
  23.         if message['type'] == 'message':
  24.             data = message['data'].decode('utf-8')
  25.             print(f"Subscriber {subscriber_id} received: {data}")
  26. # 启动一个发布者和三个订阅者
  27. threading.Thread(target=publisher).start()
  28. threading.Thread(target=subscriber, args=("S1",)).start()
  29. threading.Thread(target=subscriber, args=("S2",)).start()
  30. threading.Thread(target=subscriber, args=("S3",)).start()
复制代码
优点:
高效的广播机制,实时性好。
缺点:

  • 消息非持久化:Redis 不会保存 Pub/Sub 的消息。如果一个消费者在发布者发布消息时处于离线状态,它将永远错过这条消息。没有“消息堆积”的概念。
  • 无消息回溯:无法重新消费历史消息。
适用场景:适用于实时通知、聊天应用、状态广播等场景,绝对不能用于需要保证消息必达的业务队列。
3、Streams 实现高级消息队列

Stream 是 Redis 5.0 新增的数据类型,专为消息队列设计,支持持久化消费确认分组消费等高级特性,接近专业 MQ(如 Kafka)的功能。
原理
持久化:消息写入后持久化到磁盘,Redis 重启不丢失。
消费确认:支持 XACK 确认机制,确保消息被处理。
分组消费:多个消费者可以组成一个组,来共同消费同一个 Stream。组保证了每条消息只会被组内的一个消费者处理,实现了负载均衡。
消息回溯:通过消息ID可回溯历史消息。
  1. import redis
  2. import threading
  3. import time
  4. r = redis.Redis(host='localhost', port=6379, db=0)
  5. STREAM_KEY = 'order_events'
  6. GROUP_NAME = 'order_group'
  7. CONSUMER1_NAME = 'consumer_1'
  8. CONSUMER2_NAME = 'consumer_2'
  9. # 初始化消费组(第一次运行时创建)
  10. def init_group():
  11.     try:
  12.         r.xgroup_create(STREAM_KEY, GROUP_NAME, id=0, mkstream=True)
  13.         # 创建消费组(0 表示从第一条消息开始消费)
  14.         print(f"消费组 {GROUP_NAME} 创建成功")
  15.     except redis.exceptions.ResponseError as e:
  16.         if "already exists" in str(e):
  17.             print(f"消费组 {GROUP_NAME} 已存在")
  18.         else:
  19.             raise
  20. # 生产者:发送订单消息
  21. def producer():
  22.     for i in range(5):
  23.         order_id = f"order_{i}"
  24.         # 添加消息,* 表示自动生成消息ID(格式:时间戳-序列号)
  25.         msg_id = r.xadd(STREAM_KEY, {
  26.             'order_id': order_id,
  27.             'status': 'created',
  28.             'amount': 100 + i*10
  29.         })
  30.         print(f"生产者发送消息: {msg_id.decode()} -> {order_id}")
  31.         time.sleep(1)
  32. # 消费者1(属于消费组)
  33. def consumer1():
  34.     while True:
  35.         # 从消费组获取未消费的消息(> 表示最新未消费)
  36.         # BLOCK 0 表示阻塞等待,COUNT 1 表示一次取1条
  37.         messages = r.xreadgroup(
  38.             GROUP=GROUP_NAME,
  39.             CONSUMER=CONSUMER1_NAME,
  40.             STREAMS={STREAM_KEY: '>'},
  41.             BLOCK=0,
  42.             COUNT=1
  43.         )
  44.         if messages:
  45.             _, msg_list = messages[0]
  46.             msg_id, msg = msg_list[0]
  47.             print(f"消费者1 接收: {msg_id.decode()} -> {msg[b'order_id'].decode()}")
  48.             
  49.             # 处理消息(模拟业务逻辑)
  50.             time.sleep(0.5)
  51.             
  52.             # 确认消息已处理
  53.             r.xack(STREAM_KEY, GROUP_NAME, msg_id)
  54.             print(f"消费者1 确认: {msg_id.decode()}")
  55. # 消费者2(属于同一消费组,实现负载均衡)
  56. def consumer2():
  57.     while True:
  58.         messages = r.xreadgroup(
  59.             GROUP=GROUP_NAME,
  60.             CONSUMER=CONSUMER2_NAME,
  61.             STREAMS={STREAM_KEY: '>'},
  62.             BLOCK=0,
  63.             COUNT=1
  64.         )
  65.         if messages:
  66.             _, msg_list = messages[0]
  67.             msg_id, msg = msg_list[0]
  68.             print(f"消费者2 接收: {msg_id.decode()} -> {msg[b'order_id'].decode()}")
  69.             
  70.             # 处理消息
  71.             time.sleep(0.8)
  72.             
  73.             # 确认消息
  74.             r.xack(STREAM_KEY, GROUP_NAME, msg_id)
  75.             print(f"消费者2 确认: {msg_id.decode()}")
  76. if __name__ == "__main__":
  77.     init_group()
  78.    
  79.     # 启动生产者和两个消费者
  80.     t_producer = threading.Thread(target=producer)
  81.     t_consumer1 = threading.Thread(target=consumer1)
  82.     t_consumer2 = threading.Thread(target=consumer2)
  83.    
  84.     t_producer.start()
  85.     t_consumer1.start()
  86.     t_consumer2.start()
  87.    
  88.     t_producer.join()
  89.     t_consumer1.join()
  90.     t_consumer2.join()
复制代码
优点:

  • 支持持久化、消息确认、分组消费(负载均衡)。
  • 可回溯历史消息,避免消息丢失。
  • 接近专业 MQ 的功能,适合生产环境。
缺点:
实现相对复杂,Redis 版本需 ≥5.0。
适用场景:高可靠性要求的业务(如订单系统、支付通知)、需要负载均衡的多消费者场景。

来源:豆瓜网用户自行投稿发布,如果侵权,请联系站长删除

相关推荐

您需要登录后才可以回帖 登录 | 立即注册