- c - 在位数组中找到第一个零
- linux - Unix 显示有关匹配两种模式之一的文件的信息
- 正则表达式替换多个文件
- linux - 隐藏来自 xtrace 的命令
我有两个独立的 RabbitMQ 实例。我正试图找到最好的方式来收听来自两者的事件。
例如,我可以使用以下方式消费事件:
credentials = pika.PlainCredentials(user, pass)
connection = pika.BlockingConnection(pika.ConnectionParameters(host="host1", credentials=credentials))
channel = connection.channel()
result = channel.queue_declare(Exclusive=True)
self.channel.queue_bind(exchange="my-exchange", result.method.queue, routing_key='*.*.*.*.*')
channel.basic_consume(callback_func, result.method.queue, no_ack=True)
self.channel.start_consuming()
我有第二个主持人,“host2”,我也想听听。我考虑过创建两个单独的线程来执行此操作,但据我所知,鼠兔不是线程安全的。有没有更好的办法?或者创建两个单独的线程,每个线程监听不同的 Rabbit 实例(host1 和 host2)就足够了吗?
最佳答案
“什么是最好的方法”的答案在很大程度上取决于您的队列使用模式以及“最佳”的含义。由于我还不能对问题发表评论,我只会尝试提出一些可能的解决方案。
在每个示例中,我都假设交换已经声明。
您可以使用 pika
在单个进程中使用来自不同主机上的两个队列的消息.
你是对的 - 作为 its own FAQ states , pika
不是线程安全的,但它可以通过为每个线程创建到 RabbitMQ 主机的连接以多线程方式使用。使用 threading
使此示例在线程中运行模块如下所示:
import pika
import threading
class ConsumerThread(threading.Thread):
def __init__(self, host, *args, **kwargs):
super(ConsumerThread, self).__init__(*args, **kwargs)
self._host = host
# Not necessarily a method.
def callback_func(self, channel, method, properties, body):
print("{} received '{}'".format(self.name, body))
def run(self):
credentials = pika.PlainCredentials("guest", "guest")
connection = pika.BlockingConnection(
pika.ConnectionParameters(host=self._host,
credentials=credentials))
channel = connection.channel()
result = channel.queue_declare(exclusive=True)
channel.queue_bind(result.method.queue,
exchange="my-exchange",
routing_key="*.*.*.*.*")
channel.basic_consume(self.callback_func,
result.method.queue,
no_ack=True)
channel.start_consuming()
if __name__ == "__main__":
threads = [ConsumerThread("host1"), ConsumerThread("host2")]
for thread in threads:
thread.start()
我已将 callback_func
声明为一种纯粹用于在打印消息正文时使用 ConsumerThread.name
的方法。它也可能是 ConsumerThread
类之外的函数。
或者,您始终可以只为每个要使用事件的队列运行一个带有使用者代码的进程。
import pika
import sys
def callback_func(channel, method, properties, body):
print(body)
if __name__ == "__main__":
credentials = pika.PlainCredentials("guest", "guest")
connection = pika.BlockingConnection(
pika.ConnectionParameters(host=sys.argv[1],
credentials=credentials))
channel = connection.channel()
result = channel.queue_declare(exclusive=True)
channel.queue_bind(result.method.queue,
exchange="my-exchange",
routing_key="*.*.*.*.*")
channel.basic_consume(callback_func, result.method.queue, no_ack=True)
channel.start_consuming()
然后运行:
$ python single_consume.py host1
$ python single_consume.py host2 # e.g. on another console
如果您对来自队列的消息所做的工作是 CPU-heavy并且只要您的 CPU 中的内核数量 >= 消费者数量,通常最好使用这种方法 - 除非您的队列大部分时间都是空的并且消费者不会利用此 CPU 时间*。
另一种选择是涉及一些异步框架(例如 Twisted
)并在单线程中运行整个过程。
您不能再在异步代码中使用 BlockingConnection
;幸运的是,pika
有 Twisted
的适配器:
from pika.adapters.twisted_connection import TwistedProtocolConnection
from pika.connection import ConnectionParameters
from twisted.internet import protocol, reactor, task
from twisted.python import log
class Consumer(object):
def on_connected(self, connection):
d = connection.channel()
d.addCallback(self.got_channel)
d.addCallback(self.queue_declared)
d.addCallback(self.queue_bound)
d.addCallback(self.handle_deliveries)
d.addErrback(log.err)
def got_channel(self, channel):
self.channel = channel
return self.channel.queue_declare(exclusive=True)
def queue_declared(self, queue):
self._queue_name = queue.method.queue
self.channel.queue_bind(queue=self._queue_name,
exchange="my-exchange",
routing_key="*.*.*.*.*")
def queue_bound(self, ignored):
return self.channel.basic_consume(queue=self._queue_name)
def handle_deliveries(self, queue_and_consumer_tag):
queue, consumer_tag = queue_and_consumer_tag
self.looping_call = task.LoopingCall(self.consume_from_queue, queue)
return self.looping_call.start(0)
def consume_from_queue(self, queue):
d = queue.get()
return d.addCallback(lambda result: self.handle_payload(*result))
def handle_payload(self, channel, method, properties, body):
print(body)
if __name__ == "__main__":
consumer1 = Consumer()
consumer2 = Consumer()
parameters = ConnectionParameters()
cc = protocol.ClientCreator(reactor,
TwistedProtocolConnection,
parameters)
d1 = cc.connectTCP("host1", 5672)
d1.addCallback(lambda protocol: protocol.ready)
d1.addCallback(consumer1.on_connected)
d1.addErrback(log.err)
d2 = cc.connectTCP("host2", 5672)
d2.addCallback(lambda protocol: protocol.ready)
d2.addCallback(consumer2.on_connected)
d2.addErrback(log.err)
reactor.run()
这种方法会更好,您消耗的队列越多,消费者执行的工作对 CPU 的限制就越少*。
既然你提到了 pika
,我将自己限制在基于 Python 2.x 的解决方案上,因为 pika
尚未移植。
但是如果你想移动到 >=3.3,一个可能的选择是使用 asyncio
使用 AMQP 协议(protocol)之一(您与 RabbitMQ 交谈的协议(protocol)),例如asynqp
或 aioamqp
.
* - 请注意,这些都是非常肤浅的技巧 - 在大多数情况下,选择并不那么明显;什么对你最好取决于队列“饱和度”(消息/时间),你在收到这些消息后做了什么工作,你在什么环境中运行你的消费者等等;除了对所有实现进行基准测试外,没有其他方法可以确定
关于Python 和 RabbitMQ - 收听来自多个 channel 的消费事件的最佳方式?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28550140/
我一直在阅读有关汇编函数的内容,但对于是使用进入和退出还是仅使用调用/返回指令来快速执行,我感到很困惑。一种方式快而另一种方式更小吗?例如,在不内联函数的情况下,在汇编中执行此操作的最快(stdcal
我正在处理一个元组列表,如下所示: res = [('stori', 'JJ'), ('man', 'NN'), ('unnatur', 'JJ'), ('feel', 'NN'), ('pig',
最近我一直在做很多网络或 IO 绑定(bind)操作,使用线程有助于加快代码速度。我注意到我一直在一遍又一遍地编写这样的代码: threads = [] for machine, user, data
假设我有一个名为 user_stats 的资源,其中包含用户拥有的帖子、评论、喜欢和关注者的数量。是否有一种 RESTful 方式只询问该统计数据的一部分(即,对于 user_stats/3,请告诉我
我有一个简单的 api,它的工作原理是这样的: 用户创建一个请求 ( POST /requests ) 另一个用户检索所有请求 ( GET /requests ) 然后向请求添加报价 ( POST /
考虑以下 CDK Python 中的示例(对于这个问题,不需要 AWS 知识,这应该对基本上任何构建器模式都有效,我只是在这个示例中使用 CDK,因为我使用这个库遇到了这个问题。): from aws
Scala 中管理对象池的首选方法是什么? 我需要单线程创建和删除大规模对象(不需要同步)。在 C++ 中,我使用了静态对象数组。 在 Scala 中处理它的惯用和有效方法是什么? 最佳答案 我会把它
我有一个带有一些内置方法的类。这是该类的抽象示例: class Foo: def __init__(self): self.a = 0 self.b = 0
返回和检查方法执行的 Pythonic 方式 我目前在 python 代码中使用 golang 编码风格,决定移动 pythonic 方式 例子: import sys from typing imp
我正在开发一个 RESTful API。其中一个 URL 允许调用者通过 id 请求特定人员的记录。 返回该 id 不存在的记录的常规值是什么?服务器是否应该发回一个空对象或者一个 404,或者其他什
我正在使用 pathlib.Path() 检查文件是否存在,并使用 rasterio 将其作为图像打开. filename = pathlib.Path("./my_file-name.tif") 但
我正在寻找一种 Pythonic 方式来从列表和字典创建嵌套字典。以下两个语句产生相同的结果: a = [3, 4] b = {'a': 1, 'b': 2} c = dict(zip(b, a))
我有一个正在操裁剪理设备的脚本。设备有时会发生物理故障,当它发生时,我想重置设备并继续执行脚本。我有这个: while True: do_device_control() device
做组合别名的最pythonic和正确的方法是什么? 这是一个假设的场景: class House: def cleanup(self, arg1, arg2, kwarg1=False):
我正在开发一个小型客户端服务器程序来收集订单。我想以“REST(ful)方式”来做到这一点。 我想做的是: 收集所有订单行(产品和数量)并将完整订单发送到服务器 目前我看到有两种选择: 将每个订单行发
我知道在 Groovy 中您可以使用字符串调用类/对象上的方法。例如: Foo."get"(1) /* or */ String meth = "get" Foo."$meth"(1) 有没有办法
在 ECMAScript6 中,您可以使用扩展运算符来解构这样的对象 const {a, ...rest} = obj; 它将 obj 浅拷贝到 rest,不带属性 a。 有没有一种干净的方法可以在
我有几个函数返回数字或None。我希望我的包装函数返回第一个不是 None 的结果。除了下面的方法之外,还有其他方法吗? def func1(): return None def func2(
假设我想设计一个 REST api 来讨论歌曲、专辑和艺术家(实际上我就是这样做的,就像我之前的 1312414 个人一样)。 歌曲资源始终与其所属专辑相关联。相反,专辑资源与其包含的所有歌曲相关联。
这是我认为必须经常出现的问题,但我一直无法找到一个好的解决方案。假设我有一个函数,它可以作为参数传递一个开放资源(如文件或数据库连接对象),或者需要自己创建一个。如果函数需要自己打开文件,最佳实践通常
我是一名优秀的程序员,十分优秀!