gpt4 book ai didi

message-queue - 使用 RabbitMq 锁定并批量获取消息

转载 作者:行者123 更新时间:2023-12-03 07:47:10 25 4
gpt4 key购买 nike

我正在尝试以一种更非常规的方式使用 RabbitMq(尽管此时我可以根据需要选择任何其他消息队列实现)。消费者没有将 Rabbit 推送消息留给我的消费者,而是连接到一个队列并获取一批 N 条消息(在此期间它消耗了一些消息,并可能拒绝一些消息),之后它跳转到另一个队列,依此类推。这样做是为了冗余。如果某些消费者崩溃,所有消息都保证被其他消费者消费。

问题是我有多个消费者,我不希望他们竞争同一个队列。有没有办法保证队列上的锁?如果不是,我至少可以确保如果两个消费者连接到同一个队列,他们不会读取相同的消息吗?事务可能在某种程度上对我有帮助,但我听说它们将从 RabbitMQ 中删除。

也欢迎其他架构建议。

谢谢!

编辑:正如评论中指出的,我需要如何处理消息有一个特殊性。它们只有在分组时才有意义,并且相关消息很有可能聚集在队列中。例如,如果我提取一批 100 条消息,那么我很有可能能够对消息 1-3、4-5,6-10 等执行某些操作。如果我无法找到某些消息的组,我会会将它们重新提交到队列中。 WorkQueue 不起作用,因为它将消息从同一组传播到多个 worker ,而这些 worker 不知道如何处理它们。

最佳答案

您看过 Enterprise Integration Patterns 上的这本免费在线书籍吗? ?

听起来您确实需要一个工作流程,在消息到达您的工作人员之前,您需要一个批处理组件。使用 RabbitMQ 有两种方法可以做到这一点。要么使用一种可以为您进行批处理的交换类型(和消息格式),要么使用一个队列,以及一个对批处理进行排序并将每个批处理放入其自己的队列中的工作人员。批处理程序可能还应该向控制队列发送“批处理就绪”消息,以便工作人员可以发现新批处理队列的存在。处理完批处理后,工作人员可以删除批处理队列。

如果您可以控制消息格式,则可以让 RabbitMQ 通过多种方式隐式执行批处理。通过主题交换,您可以确保每条消息上的路由 key 的格式为 work.batchid.something,然后获悉批处理 xxyzz 存在的工作人员将使用像 #.xxyzz.# 这样的绑定(bind) key 来仅消费这些消息。无需重新发布。

另一种方法是在 header 中包含批处理 ID 并使用较新的 header 交换类型。当然,如果您愿意编写少量的 Erlang 代码,您也可以实现自己的自定义交换类型。

不过,我确实建议您查看这本书,因为它比大多数人开始使用的典型工作队列概念更好地概述了消息传递架构。

关于message-queue - 使用 RabbitMq 锁定并批量获取消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/8985643/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com