gpt4 book ai didi

Python 和 RabbitMQ - 收听来自多个 channel 的消费事件的最佳方式?

转载 作者:太空狗 更新时间:2023-10-29 18:13:57 25 4
gpt4 key购买 nike

我有两个独立的 RabbitMQ 实例。我正试图找到最好的方式来收听来自两者的事件。

例如,我可以使用以下方式消费事件:

credentials = pika.PlainCredentials(user, pass)
connection = pika.BlockingConnection(pika.ConnectionParameters(host="host1", credentials=credentials))
channel = connection.channel()
result = channel.queue_declare(Exclusive=True)
self.channel.queue_bind(exchange="my-exchange", result.method.queue, routing_key='*.*.*.*.*')
channel.basic_consume(callback_func, result.method.queue, no_ack=True)
self.channel.start_consuming()

我有第二个主持人,“host2”,我也想听听。我考虑过创建两个单独的线程来执行此操作,但据我所知,鼠兔不是线程安全的。有没有更好的办法?或者创建两个单独的线程,每个线程监听不同的 Rabbit 实例(host1 和 host2)就足够了吗?

最佳答案

“什么是最好的方法”的答案在很大程度上取决于您的队列使用模式以及“最佳”的含义。由于我还不能对问题发表评论,我只会尝试提出一些可能的解决方案。

在每个示例中,我都假设交换已经声明。

线程

您可以使用 pika 在单个进程中使用来自不同主机上的两个队列的消息.

你是对的 - 作为 its own FAQ states , pika 不是线程安全的,但它可以通过为每个线程创建到 RabbitMQ 主机的连接以多线程方式使用。使用 threading 使此示例在线程中运行模块如下所示:

import pika
import threading


class ConsumerThread(threading.Thread):
def __init__(self, host, *args, **kwargs):
super(ConsumerThread, self).__init__(*args, **kwargs)

self._host = host

# Not necessarily a method.
def callback_func(self, channel, method, properties, body):
print("{} received '{}'".format(self.name, body))

def run(self):
credentials = pika.PlainCredentials("guest", "guest")

connection = pika.BlockingConnection(
pika.ConnectionParameters(host=self._host,
credentials=credentials))

channel = connection.channel()

result = channel.queue_declare(exclusive=True)

channel.queue_bind(result.method.queue,
exchange="my-exchange",
routing_key="*.*.*.*.*")

channel.basic_consume(self.callback_func,
result.method.queue,
no_ack=True)

channel.start_consuming()


if __name__ == "__main__":
threads = [ConsumerThread("host1"), ConsumerThread("host2")]
for thread in threads:
thread.start()

我已将 callback_func 声明为一种纯粹用于在打印消息正文时使用 ConsumerThread.name 的方法。它也可能是 ConsumerThread 类之外的函数。

进程

或者,您始终可以只为每个要使用事件的队列运行一个带有使用者代码的进程。

import pika
import sys


def callback_func(channel, method, properties, body):
print(body)


if __name__ == "__main__":
credentials = pika.PlainCredentials("guest", "guest")

connection = pika.BlockingConnection(
pika.ConnectionParameters(host=sys.argv[1],
credentials=credentials))

channel = connection.channel()

result = channel.queue_declare(exclusive=True)

channel.queue_bind(result.method.queue,
exchange="my-exchange",
routing_key="*.*.*.*.*")

channel.basic_consume(callback_func, result.method.queue, no_ack=True)

channel.start_consuming()

然后运行:

$ python single_consume.py host1
$ python single_consume.py host2 # e.g. on another console

如果您对来自队列的消息所做的工作是 CPU-heavy并且只要您的 CPU 中的内核数量 >= 消费者数量,通常最好使用这种方法 - 除非您的队列大部分时间都是空的并且消费者不会利用此 CPU 时间*。

异步

另一种选择是涉及一些异步框架(例如 Twisted )并在单线程中运行整个过程。

您不能再在异步代码中使用 BlockingConnection;幸运的是,pikaTwisted 的适配器:

from pika.adapters.twisted_connection import TwistedProtocolConnection
from pika.connection import ConnectionParameters
from twisted.internet import protocol, reactor, task
from twisted.python import log


class Consumer(object):
def on_connected(self, connection):
d = connection.channel()
d.addCallback(self.got_channel)
d.addCallback(self.queue_declared)
d.addCallback(self.queue_bound)
d.addCallback(self.handle_deliveries)
d.addErrback(log.err)

def got_channel(self, channel):
self.channel = channel

return self.channel.queue_declare(exclusive=True)

def queue_declared(self, queue):
self._queue_name = queue.method.queue

self.channel.queue_bind(queue=self._queue_name,
exchange="my-exchange",
routing_key="*.*.*.*.*")

def queue_bound(self, ignored):
return self.channel.basic_consume(queue=self._queue_name)

def handle_deliveries(self, queue_and_consumer_tag):
queue, consumer_tag = queue_and_consumer_tag
self.looping_call = task.LoopingCall(self.consume_from_queue, queue)

return self.looping_call.start(0)

def consume_from_queue(self, queue):
d = queue.get()

return d.addCallback(lambda result: self.handle_payload(*result))

def handle_payload(self, channel, method, properties, body):
print(body)


if __name__ == "__main__":
consumer1 = Consumer()
consumer2 = Consumer()

parameters = ConnectionParameters()
cc = protocol.ClientCreator(reactor,
TwistedProtocolConnection,
parameters)
d1 = cc.connectTCP("host1", 5672)
d1.addCallback(lambda protocol: protocol.ready)
d1.addCallback(consumer1.on_connected)
d1.addErrback(log.err)

d2 = cc.connectTCP("host2", 5672)
d2.addCallback(lambda protocol: protocol.ready)
d2.addCallback(consumer2.on_connected)
d2.addErrback(log.err)

reactor.run()

这种方法会更好,您消耗的队列越多,消费者执行的工作对 CPU 的限制就越少*。

python 3

既然你提到了 pika,我将自己限制在基于 Python 2.x 的解决方案上,因为 pika 尚未移植。

但是如果你想移动到 >=3.3,一个可能的选择是使用 asyncio使用 AMQP 协议(protocol)之一(您与 RabbitMQ 交谈的协议(protocol)),例如asynqpaioamqp .

* - 请注意,这些都是非常肤浅的技巧 - 在大多数情况下,选择并不那么明显;什么对你最好取决于队列“饱和度”(消息/时间),你在收到这些消息后做了什么工作,你在什么环境中运行你的消费者等等;除了对所有实现进行基准测试外,没有其他方法可以确定

关于Python 和 RabbitMQ - 收听来自多个 channel 的消费事件的最佳方式?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28550140/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com