gpt4 book ai didi

go - Go 中的 RabbitMQ 消费者

转载 作者:IT王子 更新时间:2023-10-29 01:05:52 28 4
gpt4 key购买 nike

我正在尝试用 Go 编写 RabbitMQ 消费者。假设一次从队列中取出 5 个对象并处理它们。此外,如果成功处理,假设确认发送到死信队列 5 次然后丢弃,它应该无限运行并处理消费者的取消事件。我有几个问题:

  1. 在 RabbitMq-go 中是否有 BasicConsumerEventingBasicConsumer 的概念 Reference
  2. RabbitMQ 中的Model 是什么?它在 RabbitMq-go 中吗?
  3. 如何在死信队列失败时将对象发送到死信队列并在 ttl 之后再次将它们重新排队
  4. 下面代码中ch.Consume函数中consumerTag参数的意义是什么
  5. 对于这种情况,我们应该使用 channel.Get() 还是 channel.Consume()

为了满足上述要求,我需要对以下代码进行哪些更改。我问这个是因为我找不到合适的 RabbitMq-Go 文档。

   func main() {

consumer()
}

func consumer() {

objConsumerConn := &rabbitMQConn{queueName: "EventCaptureData", conn: nil}
initializeConn(&objConsumerConn.conn)


ch, err := objConsumerConn.conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()

msgs, err := ch.Consume(
objConsumerConn.queueName, // queue
"demo1", // consumerTag
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")

forever := make(chan bool)

go func() {
for d := range msgs {
k := new(EventCaptureData)
b := bytes.Buffer{}
b.Write(d.Body)
dec := gob.NewDecoder(&b)
err := dec.Decode(&k)
d.Ack(true)

if err != nil { fmt.Println("failed to fetch the data from consumer", err); }
fmt.Println(k)
}
}()

log.Printf(" Waiting for Messages to process. To exit press CTRL+C ")
<-forever

}

编辑后的问题:

我已按照链接中的建议延迟处理消息 link1 link2 .但问题是即使在 ttl 之后,消息也会从死信队列返回到它们的原始队列。我正在使用 RabbitMQ 3.0.0。谁能指出问题所在?

最佳答案

Is there any concept of BasicConsumer vs EventingBasicConsumer in RabbitMq-go Reference?

不完全是,但是 Channel.GetChannel.Consume 调用服务于类似的概念。使用 Channel.Get,您有一个非阻塞调用,如果有任何可用消息,它会获取第一条消息,或者返回 ok=false。使用 Channel.Consume,排队的消息被传送到 channel 。

What is Model in RabbitMQ and is it there in RabbitMq-go?

如果您指的是 C# RabbitMQ 中的 IModelConnection.CreateModel,那是 C# 库中的内容,而不是 RabbitMQ 本身的内容。它只是试图从 RabbitMQ“ channel ”术语中抽象出来,但它从未流行起来。

How to send the objects when failed to dead-letter queue and again re-queue them after ttl

使用 delivery.Nack使用 requeue=false 的方法。

What is the significance of consumerTag argument in the ch.Consume function in the below code

ConsumerTag 只是一个消费者标识符。它可用于取消 channel channel.Cancel ,并确定负责送货的消费者。使用 channel.Consume 传送的所有消息都将设置 ConsumerTag 字段。

Should we use the channel.Get() or channel.Consume() for this scenario?

我认为 channel.Get() 几乎永远不会优于 channel.Consume()。使用 channel.Get,您将轮询队列并浪费 CPU 无所事事,这在 Go 中没有意义。

What are the changes i need to make in the below code to meet above requirement.

  1. 由于您一次批处理 5 个,您可以有一个从消费者 channel 接收的 goroutine,一旦它收到 5 个交付,您就调用另一个函数来处理它们。

  2. 要确认或发送到死信队列,您将使用 delivery.Ackdelivery.Nack功能。您可以使用 multiple=true 并为批处理调用一次。一旦消息进入死信队列,您必须检查 delivery.Headers["x-death"] header 以了解它被死信的次数并调用 delivery.Reject当它已经重试 5 次时。

  3. 使用 channel.NotifyCancel处理取消事件。

关于go - Go 中的 RabbitMQ 消费者,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36419994/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com