gpt4 book ai didi

rabbitmq - 如何在 RabbitMQ 中重新排队消息

转载 作者:行者123 更新时间:2023-12-03 09:17:55 26 4
gpt4 key购买 nike

消费者收到消息后,消费者/ worker 进行一些验证,然后调用 Web 服务。在这个阶段,如果发生任何错误或验证失败,我们希望将消息放回最初使用它的队列。

我已阅读 RabbitMQ 文档。但我对拒绝、nack 和取消方法之间的区别感到困惑。

最佳答案

简短回答:

要重新排队特定消息,您可以同时选择 basic.rejectbasic.nackmultiple标志设置为假。
basic.consume如果您使用消息确认并且在特定时间消费者有未确认的消息并且消费者在没有确认它们的情况下退出,则调用也可能导致消息重新传递。
basic.recover将在特定 channel 上重新传递所有未确认的消息。

长答案:

basic.reject basic.nack 两者都用于相同的目的 - 丢弃或重新排队特定消费者无法处理的消息(在给定时刻,在某些条件下或根本不处理)。它们之间的主要区别在于 basic.nack支持批量消息处理,而 basic.reject没有。

Negative Acknowledgements 中描述了这种差异。 RabbitMQ 官方网站上的文章:

The AMQP specification defines the basic.reject method that allows clients to reject individual, delivered messages, instructing the broker to either discard them or requeue them. Unfortunately, basic.reject provides no support for negatively acknowledging messages in bulk.

To solve this, RabbitMQ supports the basic.nack method that provides all the functionality of basic.reject whilst also allowing for bulk processing of messages.

To reject messages in bulk, clients set the multiple flag of the basic.nack method to true. The broker will then reject all unacknowledged, delivered messages up to and including the message specified in the delivery_tag field of the basic.nack method. In this respect, basic.nack complements the bulk acknowledgement semantics of basic.ack.



请注意, basic.nack方法是 RabbitMQ 特定的扩展,而 basic.reject方法是 AMQP 0.9.1 规范的一部分。

至于 basic.cancel 方法,它用于通知服务器客户端停止消息消费。请注意,该客户端可能会收到 basic.cancel 之间的任意消息编号。方法发送接收 cancel-ok回复。如果客户端使用消息确认并且它有任何未确认的消息,它们将被移回最初使用它们的队列。

basic.recover 在 RabbitMQ 中有一些限制:它
- basic.recover with requeue=false
- basic.recover synchronicity

除了勘误表, according to RabbitMQ specs basic.recover有部分支持(不支持使用 requeue=false 进行恢复。)

关于 basic.consume 的注意事项:

basic.consume 在没有自动确认 ( no­ack=false) 的情况下启动,并且有一些未确认的消息未确认消息,然后当消费者被取消(死亡、 fatal error 、异常等)时,将重新传递未决消息。从技术上讲,在消费者释放它们(ack/nack/reject/recover)之前,不会处理待处理的消息(甚至是死信)。只有在那之后,它们才会被处理(例如,死信)。

例如,假设我们最初连续发布 5 条消息:
Queue(main) (tail) { [4] [3] [2] [1] [0] } (head)

然后消费其中的3个,但不确认它们,然后取消消费者。我们会遇到这样的情况:
Queue(main) (tail) { [4] [3] [2*] [1*] [0*] } (head)

其中星号 ( * ) 指出 redelivered标志设置为 true .

假设我们有死信交换集和死信消息队列的情况
Exchange(e-main)                                   Exchange(e-dead) 
Queue(main){x-dead-letter-exchange: "e-dead"} Queue(dead)

并假设我们使用 expire 发布 5 条消息属性设置为 5000 (5 秒):
Queue(main) (tail) { [4] [3] [2] [1] [0] } (head)
Queue(dead) (tail) { }(head)

然后我们使用来自 main 的 3 条消息排队并保持 10 秒:
Queue(main) (tail) { [2!] [1!] [0!] } (head)
Queue(dead) (tail) { [4*] [3*] } (head)

其中感叹号 ( !) 代表未确认的消息。此类消息无法传递给任何消费者,并且通常无法在管理面板中查看。但是让我们取消消费者,记住,它仍然持有 3 条未确认的消息:
Queue(main) (tail) { } (head)
Queue(dead) (tail) { [2*] [1*] [0*] [4*] [3*] } (head)

因此,现在头部中的 3 条消息被放回原始队列,但由于它们设置了每个消息的 TTL,它们被死信到死信队列的尾部(当然,通过死信交换)。

附:

消费消息又名监听新消息在某种程度上不同于直接队列访问(在不关心其他消息的情况下获取一条或多条消息)。见 basic.get 方法说明更多。

关于rabbitmq - 如何在 RabbitMQ 中重新排队消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/24107913/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com