1.RabbitMQ是什么?
消息队列,应用程序写和读消息队列来实现通信。 2.使用示例 1)生产者send.py发送消息到队列import pika# RabbitMQ用户名,密码user_name = 'guest'password = 'guest'credential = pika.PlainCredentials(user_name, password)# 建立连接connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', credentials=credential))# 消息读写管道channel = connection.channel()# 创建队列channel.queue_declare(queue='queue')# 发送消息到队列,exchange:路由器(根据类型,路由到相应的队列),routing_key:队列名称,body:消息体channel.basic_publish( exchange='', routing_key='queue', body='hello world')print('send....')# 关闭连接connection.close()
2)消费者receive.py从队列获取消息
import pikauser_name = 'guest'password = 'guest'credential = pika.PlainCredentials(user_name, password)connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', credentials=credential))channel = connection.channel()# 队列不存在则创建,只有一个队列会创建channel.queue_declare(queue='queue')# 回调处理消息def callback(ch, method, properties, body): print('receive.py...%r'%body)# 从队列接收消息,callback:消息接收后回调处理函数channel.basic_consume( callback, queue='queue', no_ack=True)print('waiting...')# 开始监听消息队列channel.start_consuming()
运行结果:
$ python receive.py receive.py: Received message 'hello world' [*] Waiting for messages. To exit press CTRL+C
3.RabbitMQ基本概念
1)收发消息过程 Broker:消息队列服务器实体 消息:就是一个简单的字符串,每个消息都有一个路由键(routing key)属性 connection:应用程序和broker的网络连接 channel:进行消息读写的管道 exchange:交换机,接收消息,根据路由键转发消息到绑定的队列 绑定:交换机(路由表)和队列绑定起来 队列:消息的容器,一个消息可投入一个或多个队列,消费者从队列取走消息 2)exchange 接收消息,根据路由键转发消息到绑定的队列 exchange有四种类型:direct,topic,headers,fanout 每种规则匹配队列时,CPU的开销是不同的,可以根据不同需求,选择不同类型的交换机。 Direct交换机:完全匹配,单播 routing key和对列名完全匹配Topic交换机:正则匹配,组播
根据binding-key匹配符合的routing-key, 匹配规则:有两种通配符"#"和"",#表示0或多个单词,表示一个单词 如:binging-key:*.sock.#匹配routing-key: usd.stock和eur.stock.db,但是不匹配stock.maFanout交换机:消息转发所有绑定对列,最快,广播
fanout不处理路由键,简单将对列绑定到交换机,消息将转发到所有绑定的队列 3)注意事项 没有队列绑定到交换机,则发送到该交换机的消息会丢失 一个交换机可以绑定多个队列,一个队列可以被多个交换机绑定 不能更改交换机类型4.RabbmitMQ属性
1)持久性 如果启用,队列再Server重启前都有效 2)自动删除 如果启用,那么队列将在所有的消费者停止使用之后自动删除自身 3)惰性 如果没有声明队列,那么在执行到使用的时候回导致异常,并不会主动声明 4)排他性 如果启用,队列只能被声明它的消费者使用5.RabbitMQ持久化
1)客户端丢失 RabbitMQ分发完消息后,就会从内存中把消息删除掉。如果客户端连接断开了,那么客户端正在处理的消息和等待处理的消息,都将丢失。 因此,RabbitMQ引入了消息确认机制no_ack=False
将receive.py中channel.basic_consume消息接收事件中,no_ack属性设为False,客户端消息处理完毕后会发送确认给RabbitMQ服务器。
需要注意的是,一定要确保在任何情况下,都会发送确认给RabbitMQ,否则将引起内存泄漏。 2)RabbitMQ服务端丢失 RabbitMQ重启后,消息将丢失。不过,RabbitMQ提供了持久化机制,可以将消息持久化到磁盘。channel.queue_declare(queue='queue', durable=True)
6.常用命令
1)启动rabbitmq-server &
2)队列重置
rabbitmqctl stop_apprabbitmqctl resetrabbitmq stop
3)关闭
rabbitmqctl stop
4)列举所有用户
rabbitmqctl list_users
5)列举所有队列
rabbitmqctl list_queues
6)添加用户
rabbitmqctl add_user user_name user_passwd
7)设置用户角色为管理员
rabbitmqctl set_user_tags user administrator
8)权限设置
rabbitmqctl set_permissions -p / user ".*" ".*" ".*"
9)查看状态
rabbitmqctl status
10)安装web管理插件
rabbitmq-plugins enable rabbitmq_management
可以通过http://localhost:15672查看服务器状态