gpt4 book ai didi

python - RabbitMQ 破损管道错误或丢失消息

转载 作者:太空狗 更新时间:2023-10-30 00:01:28 24 4
gpt4 key购买 nike

使用pika库的BlockingConnection连接RabbitMQ,发布消息偶尔报错:

Fatal Socket Error: error(32, 'Broken pipe')

这是一个非常简单的子进程,它从内存中的队列中取出一些信息并将一个小的 JSON 消息发送到 AMQP。当系统在几分钟内未发送任何消息时,似乎只会出现此错误。

设置:

connection = pika.BlockingConnection(parameters)
channel = self.connection.channel()
channel.exchange_declare(
exchange='xyz',
exchange_type='fanout',
passive=False,
durable=True,
auto_delete=False
)

排队代码捕获任何连接错误并重试:

def _enqueue(self, message_id, data):
try:
published = self.channel.basic_publish(
self.amqp_exchange,
self.amqp_routing_key,
json.dumps(data),
pika.BasicProperties(
content_type="application/json",
delivery_mode=2,
message_id=message_id
)
)

# Confirm delivery or retry
if published:
self.retry_count = 0
else:
raise EnqueueException("Message publish not confirmed.")

except (EnqueueException, pika.exceptions.AMQPChannelError, pika.exceptions.AMQPConnectionError,
pika.exceptions.ChannelClosed, pika.exceptions.ConnectionClosed, pika.exceptions.UnexpectedFrameError,
pika.exceptions.UnroutableError, socket.timeout) as e:
self.retry_count += 1
if self.retry_count < 5:
logging.warning("Reconnecting and resending")
if self.connection.is_open:
self.connection.close()
self.connect()
self._enqueue(message_id, data)
else:
raise e

这有时会在第二次尝试时起作用。它通常会挂起一段时间或只是在最终抛出异常之前丢弃消息 ( possibly related bug report )。因为它只发生在系统安静几分钟时,我猜这是由于连接超时。但是 AMQP 有一个心跳系统,据报道 pika 使用它(related bug report)。

为什么我会收到此错误或丢失消息,为什么连接在不使用时不会保持打开状态?

最佳答案

来自另一个bug report :

As BlockingConnection doesn't handle heartbeats in the background and the heartbeat_interval can't override the servers suggested heartbeat interval (that's a bug too), i suggest that heartbeats should be disabled by default (rely on TCP keep-alive instead).

If processing a task in a consume block takes longer time then the server suggested heartbeat interval, the connection will be closed by the server and the client won't be able to ack the message when it's done processing.

update在 v1.0.0 中可能有助于解决这个问题。

所以我实现了一个解决方法。每 30 秒我通过队列发布一条心跳消息。这使连接保持打开状态,并具有向客户确认我的应用程序已启动并正在运行的额外好处。

关于python - RabbitMQ 破损管道错误或丢失消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45064662/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com