一、消息队列
1.1 AMQP 是什么
AMQP:即(Advanced Message Queuing Protocol 高级消息队列协议),提供统一消息服务。是应用层协议的一个开放标准,为面向消息的中间件设计。RabbitMQ 是 AMQP 的 Erlang 语言实现。
1.2 消息队列是什么
消息队列从字面意思上来说,就是放置消息的队列,遵循先进先出原则。
1.3 为什么使用消息队列
流量削峰
当你的系统处理量不足时,你就可以将消息放在消息队列中做缓冲,然后逐步处理即可。
异步处理
当你需要调用一个用时很长的任务时,你可以使用异步调用的方式,然后让这个任务给消息队列发一个消息,这样你就不用一直等待了。
消息广播
比如消息订阅,我们只需要管消息有没有进入队列,下游有没有消费消息我们不用管。
二、安装
docker 一键安装
# latest RabbitMQ 4.0.x
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:4.0-management
默认账户密码是 guest,但是注意,这个账户密码只能在安装了RabbitMQ的机器上登录,如果你是服务器安装的,那么默认是不可以登录的,建议新建一个管理员用户,然后删除 guest用户。
三、核心概念
3.1 Virtual Host
虚拟主机是消息队列中非常重要的概念,它提供了资源隔离、权限控制和配置独立性,使得同一台服务器上的多个应用能够安全、独立地运行在自己的“虚拟环境”中。
3.2 Queue
RabbitMQ 的 queue 就是存储消息的地方
3.3 Exchange
交换机是 RabbitMQ 非常重要的一个部件,一方面它接收来自生产者的消息,另一方面它将消息 路由到队列
3.4 Broker
在技术领域,Broker 通常翻译为 “代理” 或 “中间人”。在消息队列系统(如 RabbitMQ)中,Broker 指的是消息代理服务器,负责接收、存储和转发消息。
接收和分发消息的应用,RabbitMQ Server 就是 Message Broker
3.5 Connection
生产者-消费者 和 Broker 之间的 TCP 连接。
3.6 Channel
如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP 连接的开销将是巨大的,效率也较低。Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销。
3.7 Binding
exchange和queue之间的虚拟连接,binding中可以包含routing key,Binding信息被保存到exchange中的查询表中,用于message的分发依据。
3.8 生产者-消费者
生产消息和消费消息的应用程序。
3.9 一图胜千言
简化图
生产者生产消息,推送到交换机中,交换机路由到队列,消费者消费。
四、交换机和交换机类型
4.1 简介
服务器发送消息不会直接发送到队列中,只能将消息发送给交换机,然后根据确定的规则,RabbitMQ将会决定消息该投递到哪个队列。这些规则称为路由键(routing key),队列通过路由键绑定到交换机上。消息发送到服务器端,消息也有自己的路由键(也可以是空),RabbitMQ 也会将消息和消息指定发送的交换机的绑定(binding,就是队列和交互机的根据路由键映射的关系)的路由键进行匹配。
4.2 Direct Exchange(直连交换机)
作用:将消息路由到 Binding Key 和 Routing Key 完全匹配的队列。
使用场景:适用于消息一对一精确传递的场景。
示例:
- 生产者发送消息时指定
Routing Key = "order"。 - 队列绑定到交换机时指定
Binding Key = "order"。 - 只有
Routing Key和Binding Key完全匹配时,消息才会被路由到该队列。
4.3 Fanout Exchange(扇形交换机)
作用:将消息广播到所有绑定到该交换机的队列,忽略 Routing Key。
使用场景:适用于消息广播的场景,例如日志系统或通知系统。
示例:
- 生产者发送消息到
Fanout Exchange - 所有绑定到该交换机的队列都会收到消息,无论
Routing Key是什么。
4.4 Topic Exchange(主题交换机)
作用:根据 Routing Key 和 Binding Key 的模式匹配来路由消息。Binding Key 支持通配符:
*匹配一个单词。#匹配零个或多个单词。
使用场景:适用于需要灵活路由规则的场景,例如根据消息类型或来源进行路由。
示例:
- 生产者发送消息时指定
Routing Key = "order.create"。 - 队列绑定到交换机时指定
Binding Key = "order.*"。 - 消息会被路由到该队列,因为
Routing Key匹配Binding Key的模式。
4.5 Headers Exchange(头交换机)
作用:根据消息的 Headers 属性(键值对)进行路由,而不是 Routing Key。绑定队列时可以指定匹配规则(x-match):
x-match = all:Headers必须完全匹配所有键值对。x-match = any:Headers只需匹配任意一个键值对。
使用场景:适用于需要基于消息属性进行复杂路由的场景。
示例:
- 生产者发送消息时指定
Headers = {"type": "order", "status": "new"}。 - 队列绑定到交换机时指定
x-match = all和Headers = {"type": "order", "status": "new"}。 - 消息会被路由到该队列,因为
Headers完全匹配。
4.6 Default Exchange(默认交换机)
作用:RabbitMQ默认创建的交换机,是一个特殊的Direct Exchange。所有队列都会自动绑定到该交换机,Binding Key 为队列名称。
使用场景:适用于简单的消息传递场景。
示例:
- 生产者发送消息时指定
Routing Key = "queue_name"。 - 消息会被路由到名为
queue_name的队列。
五、入门实战
5.1 Hello World
生产者
#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='admin的队列')
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()
消费者
#!/usr/bin/env python
import pika, sys, os
def main():
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):
print(f" [x] Received {body}")
channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
print('Interrupted')
try:
sys.exit(0)
except SystemExit:
os._exit(0)
5.2 工作队列
在这个教程中,我们将创建一个工作队列,用于在多个工作者之间分配耗时任务。
设置prefetch_count=1 这使用 basic.qos 协议方法告诉 RabbitMQ 一次不要给一个工作者发送多于一条消息。换句话说,不要在工作者处理并确认了上一条消息之前发送新的消息。相反,它会将其发送给下一个尚未忙碌的工作者。
生产者
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(
exchange='',
routing_key='task_queue',
body=message,
properties=pika.BasicProperties(
delivery_mode=pika.DeliveryMode.Persistent
))
print(f" [x] Sent {message}")
connection.close()
消费者
#!/usr/bin/env python
import pika
import time
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(f" [x] Received {body.decode()}")
time.sleep(body.count(b'.'))
print(" [x] Done")
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='task_queue', on_message_callback=callback)
channel.start_consuming()
5.3 发布者-订阅者
上面的工作队列模式,让我们只能把消息发给一个消费者,现在我们要发送给每个消费者
生产者
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs', exchange_type='fanout')
message = ' '.join(sys.argv[1:]) or "info: 我是顶顶顶!"
channel.basic_publish(exchange='logs', routing_key='', body=message)
print(f" [x] Sent {message}")
connection.close()
消费者
#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs', exchange_type='fanout')
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='logs', queue=queue_name)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(f" [x] {body}")
channel.basic_consume(
queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()
5.4 路由
在 RabbitMQ 中,Routing 是指消息从 Exchange 传递到 Queue 的过程。路由的核心是通过 Routing Key 和 Binding Key 的匹配规则,决定消息应该被发送到哪些队列。
Direct Exchange 的路由算法非常简单:消息会被发送到与消息的Routing Key完全匹配的 Binding Key 的队列。
匹配规则:Routing Key 必须与 Binding Key 完全一致。
生产者
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(
exchange='direct_logs', routing_key=severity, body=message)
print(f" [x] Sent {severity}:{message}")
connection.close()
消费者
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
severities = sys.argv[1:]
if not severities:
sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
sys.exit(1)
for severity in severities:
channel.queue_bind(
exchange='direct_logs', queue=queue_name, routing_key=severity)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(f" [x] {method.routing_key}:{body}")
channel.basic_consume(
queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()
5.5 主题
在RabbitMQ中,Topic Exchange 是一种灵活的路由模式,允许根据 Routing Key和Binding Key的模式匹配将消息路由到队列。主题模式的核心是支持通配符匹配,适用于需要根据消息的某些属性(如消息类型、来源等)进行动态路由的场景。
有需求可自行谷歌 🙂
5.6 RPC
远程过程调用
有需求可自行谷歌 🙂