概述

消息队列(Message Queue)是一种进程间通信方式,用于解决异步处理、削峰填谷、解耦等问题。1

应用场景

场景说明示例
异步处理非核心流程异步执行订单完成后发邮件
流量削峰缓解突发流量秒杀系统
日志处理异步收集分析ELK日志系统
事件驱动系统间事件通知分布式事务

核心概念

消息模型

发布者 ──▶ 交换机 ──▶ 队列 ──▶ 消费者
           (Exchange)  (Queue)  (Consumer)

流程

  1. 生产者将消息发送到Exchange
  2. Exchange根据路由规则将消息分发到Queue
  3. 消费者从Queue获取消息并处理

JMS模型

Java Message Service (JMS) 定义了两种消息模型:

模型说明特点
Point-to-Point (P2P)点对点,一个消息只能被一个消费者消费队列、持久化
Publish/Subscribe (Pub/Sub)发布订阅,消息被所有订阅者接收Topic、持久化

可靠性保证

级别说明可靠性
At most once最多一次,可能丢消息
At least once至少一次,可能重复
Exactly once精确一次

RabbitMQ

核心组件

组件说明
BrokerRabbitMQ服务器实例
Virtual Host隔离的AMQP环境
Exchange消息路由器
Queue消息存储队列
BindingExchange与Queue的绑定关系
Routing Key路由关键字

Exchange类型

类型路由规则
Direct完全匹配Routing Key
Fanout广播到所有绑定队列
Topic通配符匹配(* 或 #)
Headers根据消息头匹配

RabbitMQ配置

import pika
 
# 连接
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
 
# 声明交换机
channel.exchange_declare(exchange='orders', exchange_type='direct', durable=True)
 
# 声明队列
channel.queue_declare(queue='order_queue', durable=True)
 
# 绑定
channel.queue_bind(exchange='orders', queue='order_queue', routing_key='new_order')
 
# 发送消息
channel.basic_publish(
    exchange='orders',
    routing_key='new_order',
    body='Order data...',
    properties=pika.BasicProperties(delivery_mode=2)  # 持久化
)
 
# 消费消息
def callback(ch, method, properties, body):
    print(f"Received: {body}")
    ch.basic_ack(delivery_tag=method.delivery_tag)  # 确认
 
channel.basic_consume(queue='order_queue', on_message_callback=callback)
channel.start_consuming()

消息确认机制

# 手动确认
channel.basic_ack(delivery_tag=method.delivery_tag)
 
# 否定确认(重新入队)
channel.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
 
# 设置预取数量
channel.basic_qos(prefetch_count=10)  # 最多10条未确认

Kafka

核心概念

概念说明
Topic消息主题
Partition分区,实现并行处理
Replica分区副本,保证高可用
Offset消费者在分区中的位置
Producer消息生产者
Consumer消息消费者

Kafka架构

Topic: orders
┌─────────────────────────────────────────────┐
│ Partition 0  │ Partition 1  │ Partition 2  │
│ [0,1,2,3,4]  │ [0,1,2,3]   │ [0,1,2,3,4,5]│
└─────────────────────────────────────────────┘
     ↓副本复制           ↓副本复制
Leader → Follower    Leader → Follower

Kafka生产者

from kafka import KafkaProducer
import json
 
producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
 
# 发送消息
future = producer.send('orders', value={'order_id': 123, 'amount': 100})
future.get(timeout=10)  # 同步等待
 
# 异步发送
producer.send('orders', value={'order_id': 124, 'amount': 200})
producer.flush()  # 确保所有消息发送完成

Kafka消费者

from kafka import KafkaConsumer
 
consumer = KafkaConsumer(
    'orders',
    bootstrap_servers=['localhost:9092'],
    group_id='order_processor',  # 消费者组
    auto_offset_reset='earliest',  # 从最早开始消费
    enable_auto_commit=True  # 自动提交offset
)
 
for message in consumer:
    print(f"Topic: {message.topic}, Partition: {message.partition}, "
          f"Offset: {message.offset}, Value: {message.value}")

分区策略

# 指定分区
producer.send('orders', value=data, partition=0)
 
# 手动指定分区键
producer.send('orders', value=data, key=b'user_123')
 
# 自定义分区器
from kafka.processor import Partitioner
 
class UserPartitioner(Partitioner):
    def partition(self, key, num_partitions):
        return int(key) % num_partitions

三大消息队列对比

特性RabbitMQKafkaActiveMQ
定位企业级消息代理分布式流平台传统消息中间件
吞吐量中等(~10万/秒)高(百万/秒)
延迟低(μs级)低(ms级)
消息持久化支持支持支持
事务支持支持支持
优先级队列支持不支持支持
延迟队列插件支持不支持支持
集群支持原生支持支持

选型建议

场景推荐
电商订单处理RabbitMQ(事务支持好)
日志收集分析Kafka(高吞吐)
小型系统ActiveMQ(简单)

可靠性实现

消息持久化

# RabbitMQ持久化
channel.queue_declare(queue='persistent_queue', durable=True)
channel.basic_publish(
    body=data,
    properties=pika.BasicProperties(delivery_mode=2)  # 持久化
)
 
# Kafka持久化(默认)
producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    acks='all'  # 等待所有副本确认
)

消费幂等

# 方式1:数据库唯一键
def process_order(order):
    try:
        db.insert('orders', order)  # 唯一键冲突则忽略
    except DuplicateKeyError:
        pass  # 重复订单,忽略
 
# 方式2:Redis去重
def process_order(order):
    if redis.setnx(f'order:{order.id}', '1', ex=86400):
        db.process(order)

事务消息

# Kafka事务(Exactly Once语义)
producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    transactional_id='order_producer'  # 开启事务
)
 
producer.init_transactions()
producer.begin_transaction()
 
try:
    producer.send('orders', value=order_data)
    producer.send('inventory', value=inventory_data)
    producer.commit_transaction()
except Exception:
    producer.abort_transaction()

高可用设计

RabbitMQ镜像队列

# 镜像队列策略
rabbitmqctl set_policy ha-two "^orders\." '{"ha-mode":"exactly","ha-params":2}'

Kafka分区副本

# Topic配置
topic_config = {
    'topic': 'orders',
    'num_partitions': 6,
    'replication_factor': 3,  # 3副本
    'config': {
        'retention.ms': '604800000',  # 保留7天
        'min.insync.replicas': '2'    # 最少2个副本确认
    }
}

消费端负载均衡

消费者组 A: [Consumer1, Consumer2, Consumer3]
                    ↓ 分区分配
Topic: orders [P0, P1, P2, P3, P4, P5]
                      ↓
              Consumer1: P0, P3
              Consumer2: P1, P4
              Consumer3: P2, P5

参考资料


相关主题

Footnotes

  1. 本词条综合自 RabbitMQ TutorialsKafka Documentation