gpt4 book ai didi

rabbitmq - 如何根据条件限制并发消息消耗

转载 作者:行者123 更新时间:2023-12-02 17:21:43 26 4
gpt4 key购买 nike

场景(我已经简化了事情):

  • 许多最终用户可以从前端网络应用程序(生产者)开始作业(繁重的作业,例如渲染大型 PDF)。
  • 作业被发送到单个持久 RabbitMQ 队列。
  • 许多工作器应用程序(使用者)处理这些作业并将结果写回数据存储区。

这个相当标准的模式运行良好。

问题:如果用户在同一分钟内启动 10 个作业,并且一天中的那个时间只有 10 个工作应用程序启动,则该最终用户实际上接管了自己的所有计算时间.

问题:如何确保在任何时候每个最终用户只处理一项作业? (奖励:某些最终用户(例如管理员)不得受到限制)

此外,我不希望前端应用程序阻止最终用户启动并发作业。我只是希望最终用户等待他们的并发作业一次完成一个。

解决方案?:我应该为每个最终用户动态创建一个自动删除独占队列吗?如果是,我如何告诉工作应用程序开始使用这个队列?如何确保一个(且只有一个)worker 将从该队列中消费?

最佳答案

正如 Dimos 所说,您需要自己构建一些东西来实现这一点。这是一个替代实现,需要额外的队列和一些持久存储。

  • 除了现有作业队列外,还创建一个“可处理作业队列”。只有满足您的业务规则的作业才会添加到此队列中。
  • 为作业队列创建一个使用者(名为“Limiter”)。 Limiter还需要持久存储(例如Redis或关系数据库)来记录当前正在处理哪些作业。限制器从作业队列读取并写入可处理作业队列。
  • 当工作线程应用程序完成处理作业时,它会将“作业完成”事件添加到作业队列中。

    ------------     ------------     ----------- 
    | Producer | -> () job queue ) -> | Limiter |
    ------------ ------------ -----------
    ^ |
    | V
    | ------------------------
    | () processable job queue )
    job finished | ------------------------
    | |
    | V
    | ------------------------
    \-----| Job Processors (x10) |
    ------------------------

限制器的逻辑如下:

  • 收到作业消息后,检查持久存储以查看当前用户是否已在运行作业:
    • 如果没有,则将存储中的作业记录为正在运行,并将作业消息添加到可处理作业队列中。
    • 如果现有作业正在运行,则将该作业在存储中记录为待处理作业。
    • 如果作业是针对管理员用户的,请始终将其添加到可处理的作业队列中。
  • 收到“作业已完成”消息后,从持久存储中的“正在运行的作业”列表中删除该作业。然后检查该用户的待处理作业的存储:
    • 如果找到作业,则将该作业的状态从待处理更改为正在运行,并将其添加到可处理作业队列中。
    • 否则,什么都不做。
  • 一次只能运行一个限制器进程实例。这可以通过仅启动限制器进程的单个实例或通过在持久存储中使用锁定机制来实现。

它相当重量级,但如果您需要查看发生了什么,您可以随时检查持久存储。

关于rabbitmq - 如何根据条件限制并发消息消耗,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28414484/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com