gpt4 book ai didi

python - RabbitMQ + 昆布 : write/read to one-time use queues with random names

转载 作者:太空宇宙 更新时间:2023-11-04 04:58:15 24 4
gpt4 key购买 nike

我是消息交换的新手,在寻找适合该任务的手册时遇到了问题。

我需要组织队列池,以便:

  1. 生产者创建一些随机的空队列并将所有消息包写入其中(通常为 100 条消息)。

  2. 消费者找到非空和非锁定队列并从中读取直到它是空的,然后删除它并寻找下一个。

所以我的任务是将消息打包处理,我了解如何在一个队列中使用相同的键生成和使用消息,但找不到如何处理队列池。

我们可以让多个生产者和消费者并行运行,但是他们中的哪一个发送给谁是无关紧要的。 我们不需要也永远不能将特定的生产者与特定的消费者联系起来。

一般任务:我们有很多客户端要接收推送通知,我们通过一些参数将推送分组以稍后作为组进行处理,所以这样的组应该在 RabbitMQ 中的一个队列中生成并作为一个组消费,但每个组都独立于其他组。

非常感谢 Hannu 的帮助:他简单而强大的解决方案的关键思想是我们可以拥有一个已知名称的持久队列,生产者将写入已创建队列的名称,消费者将读取这些名称从那里开始。

为了使他的解决方案更具可读性和更容易工作,在我的个人任务中,我将生产者中的 publish_data() 分为两个函数 - 一个生成随机队列并将其写入 control_queue 另一个接收这个 random_queue 并用消息填充它。类似的想法对消费者有好处 - 一个函数处理队列,另一个函数将被调用来处理消息本身。

最佳答案

我曾做过类似的事情,但使用的是 Pika。我不得不为示例清理和组合旧代码片段。它可能不是很复杂(这绝对是我使用它编写的第一个代码片段),但这就是我解决它的方法。基本上我会设置一个已知名称的控制队列。

发布者将为一组消息创建一个随机队列名称,将 N 条消息转储到其中(在我的例子中是编号 1-42),然后将队列名称发布到控制队列。然后消费者收到这个队列名称,绑定(bind)到它,读取消息直到队列为空,然后删除队列。

这使事情变得相对简单,因为发布者不需要弄清楚他们可以在哪里发布他们的数据组(每个队列都是新的,具有随机名称)。接收者不需要担心超时或“全部完成”消息,因为只有当一组数据已写入队列并且每条消息都在那里等待时,接收者才会收到队列名称。

也无需修补锁或信号或任何其他会使事情复杂化的东西。您可以拥有任意数量的消费者和生产者。当然,使用交换器和路由键,可能会有不同的消费者组来完成不同的任务等。

发布者

from kombu import Connection
import uuid
from time import sleep
def publish_data(conn):
random_name= "q" + str(uuid.uuid4()).replace("-", "")
random_queue = conn.SimpleQueue(random_name)
for i in xrange(0, 42):
random_queue.put(i)
random_queue.close()
return random_name


with Connection('amqp://guest:guest@localhost:5672//') as conn:
control_queue = conn.SimpleQueue('control_queue')
_a = 0
while True:
y_name = publish_data(conn)
message = y_name
control_queue.put(message)
print('Sent: {0}'.format(message))
_a += 1
sleep(0.3)
if _a > 20:
break

control_queue.close()

消费者

from Queue import Empty

from kombu import Connection, Queue


def process_msg(foo):
print str(foo)
with Connection("amqp://guest:guest@localhost:5672//") as _conn:
sub_queue = _conn.SimpleQueue(str(foo))
while True:
try:
_msg = sub_queue.get(block=False)
print _msg.payload
_msg.ack()
except Empty:
break
sub_queue.close()
chan = _conn.channel()
dq = Queue(name=str(foo), exchange="")
bdq = dq(chan)
bdq.delete()


with Connection('amqp://guest:guest@localhost:5672//') as conn:
rec = conn.SimpleQueue('control_queue')
while True:
msg = rec.get(block=True)
entry = msg.payload
msg.ack()
process_msg(entry)

关于python - RabbitMQ + 昆布 : write/read to one-time use queues with random names,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46511133/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com