gpt4 book ai didi

java - activemq - 等待所有消息被消费

转载 作者:搜寻专家 更新时间:2023-11-01 02:27:01 24 4
gpt4 key购买 nike

我有一个案例,其中有一个正在处理多个项目的批量操作。处理完所有项目后,我必须将操作状态更新为已完成。项目由多个消费者并行处理。

理论上,消费者在处理一个项目后可以检查是否没有剩余项目(或队列中没有消息用于此操作),但有可能两个消费者(A 和 B)同时完成,他们两者同时检查并且他们都看到另一个还没有准备好(因为交易尚未提交) - 消费者 A 不会看到消费者 B 所做的更改,消费者 B 也不会看到消费者 A 所做的更改,所以他们都不会更新 Action 状态。我说得对吗?

如何在没有某种额外的状态定期检查且没有开销的情况下实现这样的条件?如果每个操作有数千个项目,定期检查可能会很好,但如果通常有 1-2 个长时间运行的项目,则效率非常低。

谢谢!

编辑:简而言之 - 在处理一组消息后触发某些操作的正确方法是什么,但是:

  • 消息必须并行处理
  • 定期检查是否所有消息都已处理不是解决问题的方法

最佳答案

您需要一流的批处理设施。仅仅依靠队列的大小是不够可靠的。例如,您可以让一个进程处理消息然后拒绝它,从而将消息放回队列(之前是“空的”)。

相反,让批处理成为一流的概念。考虑发送包含批处理中项目数的“批处理开始”消息。然后在处理消息时,它们可以更新批处理状态记录或其他一些设备。批处理状态可以跟踪处理的消息数、通过数、失败数等。

当处理完最后一条消息时,它可以检查批处理状态以查看它是否是“最后一条消息”,方法是查看处理的消息计数是否与批处理计数“减 1”相匹配(因为它正在运行最后一条消息)。

您需要使此过程原子化,例如,如果您使用 SQL,您将观察获取批处理状态行“FOR UPDATE”,这会将行锁定到您的事务,从而锁定您的比较可以是原子的。

您也可以在该行上放置一个触发器,并让它检查是否符合您的风格。

或者你可以在你的系统上有一个全局对象来为你管理它。各种机制。

但关键是您有一些总体的批处理概念来管理所有工作人员。您无法在单个工作人员级别执行此操作,这并不可靠。

关于java - activemq - 等待所有消息被消费,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/19848884/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com