gpt4 book ai didi

java - 我可以使用自定义算法而不是使用 RabbitMQ 的循环调度消息吗?

转载 作者:塔克拉玛干 更新时间:2023-11-03 02:59:02 25 4
gpt4 key购买 nike

我正在使用 RabbitMQ 的循环功能在多个消费者之间发送消息,但一次只有一个消费者接收实际消息。

我的问题是我的消息代表任务,我想在我的消费者上有本地 session (状态)。我事先知道哪些消息属于哪个 session ,但我不知道使用我指定的算法将 RabbitMQ 发送给消费者的最佳方法是什么(或者有什么方法吗?)。

我不想编写自己的编排服务,因为它会成为瓶颈,而且我不想让我的生产者知道哪个消费者会接收他们的消息,因为我会失去使用 Rabbit 获得的解耦。

有没有办法让 RabbitMQ 根据预定义的算法/规则而不是循环法将我的消息发送给消费者?

说明:我使用了几个用不同语言编写的微服务,每个服务都有自己的工作。我使用 protobuf 消息在它们之间进行通信。我给每条新消息一个 UUID。如果消费者收到一条消息,它可以从中创建一条响应消息(这可能不是正确的术语,因为生产者和消费者是解耦的,他们彼此不了解)和这个UUID 被复制到新消息中。这形成了一个数据转换管道,这个“进程”UUID(processId)标识。我的问题是,我可能有多个 worker 消费者,我需要一个 worker 坚持到一个UUID,如果它以前见过它的话。我有这个需求是因为

  1. 每个进程可能都有本地状态
  2. 进程完成后我想清理本地状态
  3. 一个微服务可能会收到同一个进程的多个消息,我需要区分哪个消息属于哪个进程

由于 RabbitMQ 使用循环法在工作人员之间分配任务,因此我不能强制我的进程坚持工作人员。我有几个警告:

  • 生产者与消费者分离,因此直接消息传递不是一种选择
  • worker 的数量不是恒定的(有一个负载均衡器可能会启动一个 worker 的新实例)

如果有不涉及更改循环算法且不破坏我的约束的变通方法,那也是可以的!

最佳答案

如果您不想使用编排服务,您可以尝试使用这样的拓扑结构:

enter image description here

为简单起见,我假设您的 processId 用作路由键(在现实世界中,您可能希望将其存储在 header 中并使用 header 交换相反)。

传入消息将被 Incoming Exchange(类型:直接)接受,它有一个 alternative-exchange属性设置为指向 No Session Exchange(扇出)。

这是 RabbitMQ 文档在“替代交换”上所说的内容:

It is sometimes desirable to let clients handle messages that an exchange was unable to route (i.e. either because there were no bound queues our no matching bindings).

Typical examples of this are

  • detecting when clients accidentally or maliciously publish messages that cannot be routed
  • "or else" routing semantics where some messages are handled specially and the rest by a generic handler

RabbitMQ's Alternate Exchange ("AE") feature addresses these use cases.

(我们对这里的or else用例特别感兴趣)

每个消费者都将创建自己的队列并将其绑定(bind)到 Incoming Exchange,使用 processId(s) 作为目前已知的 session ,作为绑定(bind)的路由键。

这样它只会收到它感兴趣的 session 的消息。

此外,所有消费者都将绑定(bind)到共享的No Session Queue

如果收到一 strip 有以前未知的 processId 的消息,则不会为它注册到 Incoming Exchange 的特定绑定(bind),因此它会被重新路由到没有 session 交换 => 没有 session 队列并以通常的(循环)方式分派(dispatch)给其中一个消费者。

然后消费者将通过 Incoming Exchange 为它注册一个新的绑定(bind)(即开始一个新的“ session ”),这样它将通过这个 获取所有后续消息processId.

一旦“ session ”结束,它将必须删除相应的绑定(bind)(即关闭“ session ”)。

关于java - 我可以使用自定义算法而不是使用 RabbitMQ 的循环调度消息吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43001689/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com