
我们经常使用如下脚本监听MQ,它在阿里云上工作正常,但是在本地不能保持长连接,会提示报错: ConnectionResetError(104, ‘Connection reset by peer’)
Code mq1.py """
@author: Zhigang Jiang
@date: 2022/1/16
@description:
We usually use this script to listen to MQ, and it works on AliYun, but can't hold long connection on locally devices.
It report the exceptions: ConnectionResetError(104, 'Connection reset by peer').
"""
import pika
import threading
import time
def on_message(channel, method_frame, header_frame, body):
print(f'on_message thread id: {threading.get_ident()}')
delivery_tag = method_frame.delivery_tag
print(body, "start")
for i in range(10):
print(i)
time.sleep(20)
print(body, "end")
channel.basic_ack(delivery_tag)
credentials = pika.PlainCredentials('username', 'password')
parameters = pika.ConnectionParameters('test.webapi.username.com', credentials=credentials, heartbeat=5)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
channel.queue_declare(queue="standard", durable=True)
channel.basic_qos(prefetch_count=1)
channel.basic_consume('standard', on_message)
print(f'main thread id: {threading.get_ident()}')
try:
channel.start_consuming()
except KeyboardInterrupt:
channel.stop_consuming()
connection.close()
Result
执行情况:
(Test) [user@user test]$ python mq1.py main thread id: 140682766104384 on_message thread id: 140682766104384 b'1' start 0 1 2 3 4 5 6 7 8 9 b'1' end Traceback (most recent call last): File "mq1.py", line 38, inNotechannel.start_consuming() File "/home/user/conda/envs/Test/lib/python3.8/site-packages/pika/adapters/blocking_connection.py", line 1865, in start_consuming self._process_data_events(time_limit=None) File "/home/user/conda/envs/Test/lib/python3.8/site-packages/pika/adapters/blocking_connection.py", line 2026, in _process_data_events self.connection.process_data_events(time_limit=time_limit) File "/home/user/conda/envs/Test/lib/python3.8/site-packages/pika/adapters/blocking_connection.py", line 833, in process_data_events self._dispatch_channel_events() File "/home/user/conda/envs/Test/lib/python3.8/site-packages/pika/adapters/blocking_connection.py", line 567, in _dispatch_channel_events impl_channel._get_cookie()._dispatch_events() File "/home/user/conda/envs/Test/lib/python3.8/site-packages/pika/adapters/blocking_connection.py", line 1492, in _dispatch_events consumer_info.on_message_callback(self, evt.method, File "mq1.py", line 24, in on_message channel.basic_ack(delivery_tag) File "/home/user/conda/envs/Test/lib/python3.8/site-packages/pika/adapters/blocking_connection.py", line 2112, in basic_ack self._flush_output() File "/home/user/conda/envs/Test/lib/python3.8/site-packages/pika/adapters/blocking_connection.py", line 1335, in _flush_output self._connection._flush_output(lambda: self.is_closed, *waiters) File "/home/user/conda/envs/Test/lib/python3.8/site-packages/pika/adapters/blocking_connection.py", line 523, in _flush_output raise self._closed_result.value.error pika.exceptions.StreamLostError: Stream connection lost: ConnectionResetError(104, 'Connection reset by peer')
从结果来看,异常发生在一次长时间的消费过程(200s)完成后报错,具体为调用channel.basic_ack(delivery_tag)发生报错。推测是此时与MQ Server的连接已经被重置ConnectionResetError(104, 'Connection reset by peer'),此时再主动确认就发生报错。
Solution1网上搜索一般是说调用connection.process_data_events()或者channel.connection.sleep(5.0)阻塞Channel,来保持连接,这样在处理耗时任务时每隔一段时间调用一次。
Code mq1_.py"""
@author: Zhigang Jiang
@date: 2022/1/16
@description:
Keep calling connection.process_data_events(), blocking channels, and keeping the connection to MQ.
"""
import pika
import threading
import time
def keep_connection(connection):
print(f'keep_connection thread id: {threading.get_ident()}')
time.sleep(10)
while connection.is_open:
print("keep_connection")
# channel.connection.sleep(5.0)
connection.process_data_events()
time.sleep(10)
# channel.basic_publish(exchange='', routing_key=dl_queue_name, body="1")
print("keep_connection close")
def on_message(channel, method_frame, header_frame, body):
print(f'on_message thread id: {threading.get_ident()}')
delivery_tag = method_frame.delivery_tag
print(body, "start")
for i in range(10):
print(i)
time.sleep(20)
print(body, "end")
channel.basic_ack(delivery_tag)
credentials = pika.PlainCredentials('username', 'password')
parameters = pika.ConnectionParameters('test.webapi.username.com', credentials=credentials, heartbeat=5)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
channel.queue_declare(queue="standard", durable=True)
channel.basic_qos(prefetch_count=1)
channel.basic_consume('standard', on_message)
t = threading.Thread(target=keep_connection, args=(connection,))
t.start()
print(f'main thread id: {threading.get_ident()}')
try:
channel.start_consuming()
except KeyboardInterrupt:
channel.stop_consuming()
connection.close()
Result
(Test) [user@user test]$ python mq1_.py keep_connection thread id: 139910029436672 main thread id: 139910160156480 on_message thread id: 139910160156480 b'123' start 0 keep_connection keep_connection 1 keep_connection 2 keep_connection keep_connection 3 keep_connection keep_connection 4 keep_connection keep_connection 5 keep_connection keep_connection 6 keep_connection keep_connection 7 keep_connection keep_connection 8 keep_connection keep_connection 9 keep_connection keep_connection b'123' end keep_connection keep_connection ... keep_connection keep_connection Traceback (most recent call last): File "mq.py", line 53, inNotechannel.start_consuming() File "/home/user/conda/envs/Test/lib/python3.8/site-packages/pika/adapters/blocking_connection.py", line 1865, in start_consuming self._process_data_events(time_limit=None) File "/home/user/conda/envs/Test/lib/python3.8/site-packages/pika/adapters/blocking_connection.py", line 2026, in _process_data_events self.connection.process_data_events(time_limit=time_limit) File "/home/user/conda/envs/Test/lib/python3.8/site-packages/pika/adapters/blocking_connection.py", line 824, in process_data_events self._flush_output(common_terminator) File "/home/user/conda/envs/Test/lib/python3.8/site-packages/pika/adapters/blocking_connection.py", line 523, in _flush_output raise self._closed_result.value.error pika.exceptions.StreamLostError: Stream connection lost: IndexError('pop from an empty deque') keep_connection close
这种解决方式可以维持较长时间,但是最后还是断开了,报错:IndexError('pop from an empty deque')
Solution2另一种解决方法是pika是线程不安全的,所以在接收消息和ACK响应消息时需要另外线程,参考官方示例有如下代码
Code mq2.py"""
@author: Zhigang Jiang
@date: 2022/1/16
@description:
"""
import functools
import pika
import threading
import time
def ack_message(channel, delivery_tag):
print(f'ack_message thread id: {threading.get_ident()}')
if channel.is_open:
channel.basic_ack(delivery_tag)
else:
# Channel is already closed, so we can't ACK this message;
# log and/or do something that makes sense for your app in this case.
pass
def do_work(channel, delivery_tag, body):
print(f'do_work thread id: {threading.get_ident()}')
print(body, "start")
for i in range(10):
print(i)
time.sleep(20)
print(body, "end")
cb = functools.partial(ack_message, channel, delivery_tag)
channel.connection.add_callback_threadsafe(cb)
def on_message(channel, method_frame, header_frame, body):
print(f'on_message thread id: {threading.get_ident()}')
delivery_tag = method_frame.delivery_tag
t = threading.Thread(target=do_work, args=(channel, delivery_tag, body))
t.start()
credentials = pika.PlainCredentials('username', 'password')
parameters = pika.ConnectionParameters('test.webapi.username.com', credentials=credentials, heartbeat=5)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
channel.queue_declare(queue="standard", durable=True)
channel.basic_qos(prefetch_count=1)
channel.basic_consume('standard', on_message)
print(f'main thread id: {threading.get_ident()}')
try:
channel.start_consuming()
except KeyboardInterrupt:
channel.stop_consuming()
connection.close()
Note
这种方式经我测试是可以保持长连接,所以我使用这种方法。
其中有2个地方开了新线程,一个是接收消息后do_work在新的线程,另一个是ack_message也是在新的一个线程。
这里尝试ack_message不在新线程
Code mq2_1.py"""
@author: Zhigang Jiang
@date: 2022/1/16
@description:
"""
import pika
import threading
import time
def do_work(channel, delivery_tag, body):
print(f'do_work thread id: {threading.get_ident()}')
print(body, "start")
for i in range(10):
print(i)
time.sleep(20)
print(body, "end")
channel.basic_ack(delivery_tag)
def on_message(channel, method_frame, header_frame, body):
print(f'on_message thread id: {threading.get_ident()}')
delivery_tag = method_frame.delivery_tag
t = threading.Thread(target=do_work, args=(channel, delivery_tag, body))
t.start()
credentials = pika.PlainCredentials('username', 'password')
parameters = pika.ConnectionParameters('test.webapi.username.com', credentials=credentials, heartbeat=5)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
channel.queue_declare(queue="standard", durable=True)
channel.basic_qos(prefetch_count=1)
channel.basic_consume('standard', on_message)
print(f'main thread id: {threading.get_ident()}')
try:
channel.start_consuming()
except KeyboardInterrupt:
channel.stop_consuming()
connection.close()
Result
(Test) [user@user test]$ python mq2_1.py main thread id: 139832596416320 on_message thread id: 139832596416320 do_work thread id: 139832465696512 b'2' start 0 1 2 3 4 5 6 7 8 9 b'2' end Traceback (most recent call last): File "mq2.py", line 41, inNotechannel.start_consuming() File "/home/user/conda/envs/Test/lib/python3.8/site-packages/pika/adapters/blocking_connection.py", line 1865, in start_consuming self._process_data_events(time_limit=None) File "/home/user/conda/envs/Test/lib/python3.8/site-packages/pika/adapters/blocking_connection.py", line 2026, in _process_data_events self.connection.process_data_events(time_limit=time_limit) File "/home/user/conda/envs/Test/lib/python3.8/site-packages/pika/adapters/blocking_connection.py", line 824, in process_data_events self._flush_output(common_terminator) File "/home/user/conda/envs/Test/lib/python3.8/site-packages/pika/adapters/blocking_connection.py", line 523, in _flush_output raise self._closed_result.value.error pika.exceptions.StreamLostError: Stream connection lost: IndexError('pop from an empty deque')
结果可以看到也是报错:IndexError('pop from an empty deque')
Solution4这里尝试do_work不在新线程,ack_message在新的一个线程
Code mq1_2.py"""
@author: Zhigang Jiang
@date: 2022/1/16
@description:
"""
import functools
import logging
import pika
import threading
import time
def do_work(channel, delivery_tag, body):
print(f'do_work thread id: {threading.get_ident()}')
print(body, "start")
for i in range(10):
print(i)
time.sleep(20)
print(body, "end")
channel.basic_ack(delivery_tag)
def on_message(channel, method_frame, header_frame, body):
print(f'on_message thread id: {threading.get_ident()}')
delivery_tag = method_frame.delivery_tag
t = threading.Thread(target=do_work, args=(channel, delivery_tag, body))
t.start()
credentials = pika.PlainCredentials('username', 'password')
parameters = pika.ConnectionParameters('test.webapi.username.com', credentials=credentials, heartbeat=5)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
channel.queue_declare(queue="standard", durable=True)
channel.basic_qos(prefetch_count=1)
channel.basic_consume('standard', on_message)
print(f'main thread id: {threading.get_ident()}')
try:
channel.start_consuming()
except KeyboardInterrupt:
channel.stop_consuming()
connection.close()
Result
(Test) [user@user test]$ python mq1_2.py main thread id: 139670376191808 on_message thread id: 139670376191808 b'1' start 0 1 2 3 4 5 6 7 8 9 b'1' end Traceback (most recent call last): File "mq1_2.py", line 46, inNotechannel.start_consuming() File "/home/user/conda/envs/Test/lib/python3.8/site-packages/pika/adapters/blocking_connection.py", line 1865, in start_consuming self._process_data_events(time_limit=None) File "/home/user/conda/envs/Test/lib/python3.8/site-packages/pika/adapters/blocking_connection.py", line 2026, in _process_data_events self.connection.process_data_events(time_limit=time_limit) File "/home/user/conda/envs/Test/lib/python3.8/site-packages/pika/adapters/blocking_connection.py", line 824, in process_data_events self._flush_output(common_terminator) File "/home/user/conda/envs/Test/lib/python3.8/site-packages/pika/adapters/blocking_connection.py", line 523, in _flush_output raise self._closed_result.value.error pika.exceptions.StreamLostError: Stream connection lost: ConnectionResetError(104, 'Connection reset by peer')
这种方式也不行,会报错:ConnectionResetError(104, 'Connection reset by peer')
Conclusion最终使用Solution2方法
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)