
开始准备
首先需要安装rabbitmq和erlang,安装方式很简单
- 其中需要注意的是版本问题 我暂时用的3.8.5和23.0
https://www.rabbitmq.com/changelog.html - 还需要添加一下erlang
- python中需要安装pika库
代码中的参数可能不一样,那就是版本问题
send.py
生产者
import random
import pika
# credentials = pika.PlainCredentials("","")
# 新建连接,rabbitmq安装在本地则hostname为'localhost'
hostname = 'localhost'
parameters = pika.ConnectionParameters(hostname)
connection = pika.BlockingConnection(parameters)
# 创建通道
channel = connection.channel()
# 声明一个队列,生产者和消费者都要声明一个相同的队列, 用来防止万一某一方挂了,另一方能正常运行
channel.queue_declare(queue='hello') # durable=True队列持久化
number = random.randint(1, 1000)
body = 'hello world:%s' % number
# 交换机; 队列名,写明将消息发往哪个队列; 消息内容
# routing_key在使用匿名交换机的时候才需要指定,表示发送到哪个队列
channel.basic_publish(exchange="", routing_key="task_queue", body=message,
properties=pika.BasicProperties(delivery_mode=2 # 使消息持久化,
)
)
print(" [x] Sent %s" % body)
connection.close()
reveive.py
消费者
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import time
import pika
hostname = 'localhost'
parameters = pika.ConnectionParameters(hostname)
connection = pika.BlockingConnection(parameters)
# 创建通道
channel = connection.channel()
# channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):
time.sleep(10)
print(" [x] Received %r" % (body,))
# 告诉rabbitmq使用callback来接收信息
channel.basic_consume(queue="hello",on_message_callback=callback,auto_ack=True)
# 开始接收信息,并进入阻塞状态,队列里有信息才会调用callback进行处理,按ctrl+c退出
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
消费者中需要注意 channel.basic_consume中的auto_ack,用于是否需要确认
确认就是在callback中加一个 ch.basic_ack(delivery_tag=method.delivery_tag)
recv_safe.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import time
import pika
hostname = 'localhost'
parameters = pika.ConnectionParameters(hostname)
connection = pika.BlockingConnection(parameters)
# 创建通道
channel = connection.channel()
channel.queue_declare(queue='hello') # durable=True队列持久化
def callback(ch, method, properties, body):
time.sleep(10)
print(" [x] Received %r" % (body,))
ch.basic_ack(delivery_tag=method.delivery_tag)
# ack 确认
# 告诉rabbitmq使用callback来接收信息
channel.basic_consume(queue="hello", on_message_callback=callback,
auto_ack=False) # auto_ack True生产者发出之后就不管了,不需要确认 False
# 开始接收信息,并进入阻塞状态,队列里有信息才会调用callback进行处理,按ctrl+c退出
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
队列持久化,需要在声明队列时将属性durable设置为True
消息持久化,需要将basic_publish中的properties属性设置pika.BasicProperties(delivery_mode=2 )
如果队列关闭,消息也会关闭
这个是在本地使用的,如果连接其他需要用户认证和一个类似白名单的,具体之后再加
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)