gpt4 book ai didi

synchronization - 如何将消费者与rabbitmq同步

转载 作者:行者123 更新时间:2023-12-03 19:36:52 24 4
gpt4 key购买 nike

我有以下问题:
我有一个 RabbitMQ 集群、一个消息生产者和一个消费者集群(用于高可用性)。
每当消费者收到消息时,它就会根据消息内容生成另一个进程并运行它。这个过程很长,大约需要 30 分钟。

我必须确保一次处理一条消息。但是,消费者多于一个,因此如果队列中有 2 条消息,则一个消费者获得一条消息,第二个消费者获得另一条消息,并且它们是并行处理的。

供引用:每个消费者都驻留在不同的机器上。

是否有任何 RabbitMQ 级别的机制可以让我等待消费下一条消息,直到前一条消息被确认?还是我必须在服务器之间开发一些锁定机制?

最佳答案

我只是从rabbitmq 文档中提到我认为能够满足您的要求的几点。

  • 第一个引用 - https://www.rabbitmq.com/consumer-priority.html

  • Normally, active consumers connected to a queue receive messages from it in a round-robin fashion. When consumer priorities are in use, messages are delivered round-robin if multiple active consumers exist with the same high priority.



    我假设您的所有消费者都有相同的优先级,因此消息将平均分配给所有活跃的消费者。
  • 第二个引用 - https://www.rabbitmq.com/consumer-prefetch.html

  • the basic.qos method to allow you to limit the number of unacknowledged messages on a channel (or connection) when consuming



    正如您所提到的,您将在一台机器上拥有单个消费者,这更容易。

    只需为每个消费者设置消费者预取限制 1。因此,在要求确认之前,服务器将只向消费者传递一条消息。并在您的消息完全处理后发送基本确认。
    Channel channel = ...;
    Consumer consumer = ...;
    channel.basicQos(1); // Per consumer limit

    Consumer consumer = new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

    System.out.println("received message");
    // process the message .. time consuming

    // after processing send the basic ack so that next message can be received from queue
    channel.basicAck(envelope.getDeliveryTag(), false);
    };

    channel.basicConsume("my-queue", false, consumer);

    我希望这有帮助。

    更新 -

    只是为了添加更多描述 -
    当你使用
    channel.basicQos(x);

    Rabbitmq 将向 channel 上的每个消费者推送最多 x 条未确认消息(如果队列中可用,当然在遵守优先级和循环等之后)。这意味着 channel 上的每个消费者不会有超过 x 条未确认的消息,也就是说,消费者可以在任何给定时刻同时处理最多 x 条消息。一旦消费者发回 ack,下一条消息就可以推送给它。如果消费者觉得无法处理消息,也可以发送 nack。在这种情况下,消息将被重新排队,并且重新排队的消息可能会根据优先级、循环等方式到达队列中的任何消费者。

    每个 channel 可以有多个消费者。所以,当你使用
    channel.basicQos(x, true);

    限制 x 适用于整个 channel ,而不是 channel 上的单个/每个消费者。

    在您的情况下,每个 channel 上只有一个消费者。所以 channel 限制实际上对你的情况没有任何影响。

    更多更新 -

    一台机器通过一个连接连接到RabbitMQ。一个连接可以有多个 channel 。一个 channel 可以有多个消费者。所以从逻辑上讲,可以有不同的机器连接到 RabbitMQ 并且有多个 channel 和消费者在同一个队列上监听。您可以同时为 channel (使用 channel.basicQos(x, true) )和 channel 内的消费者(使用 channel.basicQos(x, false) )设置 QOS 限制。限制 0 表示无限制。显然,这些限制适用于 channel 实例。驻留在不同 channel 实例(在同一台机器或不同机器上)的所有消费者都有自己的限制(默认或通过 QOS 方法明确设置)。

    关于synchronization - 如何将消费者与rabbitmq同步,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47678522/

    24 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com