gpt4 book ai didi

python - 鼠兔与 celery ,连接已关闭

转载 作者:行者123 更新时间:2023-12-01 04:57:05 25 4
gpt4 key购买 nike

我使用 celery 和 RabbitMQ 来运行一些任务,有时我需要将消息从工作人员返回到 RabbitMQ,所以我使用 pika。

我目前正在使用 BlockingConnection() 来连接到 RabbitMQ,但过了一会儿我收到异常“连接丢失”。

我相信发生这种情况是因为 celery 是异步的,并且我正在使用 BlockingConnection()。

这是我的代码:

class RabbitConnection(object):
def __init__(self):
self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
self.channel = self.connection.channel()
self.channel.queue_declare(queue=RABBITMQ_OUT_NAME, durable=True)
self.channel.confirm_delivery()

def add_alert(self, new_alert):
message = new_alert.to_json()
delivered = self.channel.basic_publish(exchange='',
routing_key=RABBITMQ_OUT_NAME,
body=message,
properties=pika.BasicProperties(
delivery_mode=2,
content_type='application/json',
))

我应该使用不同的连接吗?如果是这样我应该如何使用它?

最佳答案

听起来这可能是线程问题。您可以通过多个线程使用 Pika 处理请求,但理想情况下每个线程应该有一个连接,或 use locking 。我建议您使用线程安全库,而不是给代码增加额外的复杂性;如amqp-stormrabbitpy .

如果您使用我的 AMQP-Storm 库来实现此功能,代码将如下所示。

import amqpstorm

class RabbitConnection(object):
def __init__(self):
self.connection = amqpstorm.Connection('localhost', 'guest', 'guest')
self.channel = self.connection.channel()
self.channel.queue.declare(queue=RABBITMQ_OUT_NAME, durable=True)
self.channel.confirm_deliveries()

def add_alert(self, new_alert):
message = new_alert.to_json()
delivered = self.channel.basic.publish(exchange='',
routing_key=RABBITMQ_OUT_NAME,
body=message,
properties={
'delivery_mode': 2,
'content_type': 'application/json',
})

关于python - 鼠兔与 celery ,连接已关闭,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27145334/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com