gpt4 book ai didi

python - 如何使用 py-amqplib 在多个队列上等待消息

转载 作者:行者123 更新时间:2023-11-28 18:55:48 24 4
gpt4 key购买 nike

我正在使用 py-amqplib 在 Python 中访问 RabbitMQ。应用程序会不时收到监听某些 MQ 主题的请求。

第一次收到这样的请求时,它会创建一个 AMQP 连接和一个 channel ,并启动一个新线程来监听消息:

    connection = amqp.Connection(host = host, userid = "guest", password = "guest", virtual_host = "/", insist = False)
channel = connection.channel()

listener = AMQPListener(channel)
listener.start()

AMQPListener 非常简单:

class AMQPListener(threading.Thread):
def __init__(self, channel):
threading.Thread.__init__(self)
self.__channel = channel

def run(self):
while True:
self.__channel.wait()

创建连接后,它会订阅感兴趣的主题,如下所示:

channel.queue_declare(queue = queueName, exclusive = False)
channel.exchange_declare(exchange = MQ_EXCHANGE_NAME, type = "direct", durable = False, auto_delete = True)
channel.queue_bind(queue = queueName, exchange = MQ_EXCHANGE_NAME, routing_key = destination)

def receive_callback(msg):
self.queue.put(msg.body)

channel.basic_consume(queue = queueName, no_ack = True, callback = receive_callback)

这是第一次一切正常。但是,它在后续请求订阅另一个主题时失败。在后续请求中,我重新使用 AMQP 连接和 AMQPListener 线程(因为我不想为每个主题启动一个新线程),当我调用 channel.queue_declare() 上方的代码块时方法调用永远不会返回。那时我还尝试创建一个新 channel ,但 connection.channel() 调用也永远不会返回。

我能够让它工作的唯一方法是为每个主题(即 routing_key)创建一个新的连接、 channel 和监听器线程,但这确实不理想。我怀疑是 wait() 方法以某种方式阻塞了整个连接,但我不确定该怎么做。我当然应该能够使用单个监听器线程接收具有多个路由键(甚至在多个 channel 上)的消息?

一个相关的问题是:当该主题不再令人感兴趣时,我如何停止监听器线程?如果没有消息,channel.wait() 调用似乎会永远阻塞。我能想到的唯一方法是向队列发送一条虚拟消息,使其“中毒”,即。被听者解释为停止的信号。

最佳答案

如果您希望每个 channel 有多个消费者,只需使用 basic_consume() 附加另一个,然后使用 channel.wait()。它将监听通过 basic_consume() 连接的所有队列。确保为每个 basic_consume() 定义不同的消费者标签。

如果您想取消队列中的特定消费者(取消收听特定主题),请使用channel.basic_cancel(consumer_tag)

关于python - 如何使用 py-amqplib 在多个队列上等待消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/1807113/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com