gpt4 book ai didi

go - 更正 Go/RabbitMQ 方法以 "pop"队列中的一条消息?

转载 作者:IT王子 更新时间:2023-10-29 01:08:19 24 4
gpt4 key购买 nike

我的第一个问题实际上是一个设计问题。这是我第一次编写使用队列的服务,我也是 Go 的新手。我正在尝试确定我是否应该以这样一种方式编写我的工作人员,即它只是从队列中弹出一条消息,处理它,然后消失。对于像 Kubernetes 这样的东西,这似乎相当微不足道。

或者我是否应该让一个长寿的工作人员不断等待新消息,但如果它死了(由于错误或意外),它会重新启动?

我问这个问题的原因是,为了实现前者,感觉有点“搞砸了”,因为我必须使用来自 streadway/amqp 的通用 go AMQP 库编写以下内容(阅读评论):

// Pop will extract a message from the AMQP queue
func (v *Queue) Pop() (data []byte, err error) {
msgs, err := v.Channel.Consume(
v.QueueName, // queue
v.ConsmerID, // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
return nil, err
}

// We have to use for .. range because Consume returns
// "<-chan Delivery" but if we only want ONE message popped off
// we return on the first one
for data := range msgs {
return data.Body, nil
}

// We should never get this far...
return nil, errors.New("Something went wrong")
}

此外什么是 <-chan Delivery在这种情况下?它看起来像是某种可以插入的“流”或对象。有没有办法不必为这些数据类型编写 for 循环?

编辑:我还发现这段代码似乎会使整个队列出列,即使它只执行一次 for 循环迭代(如上面的代码所示)。我也不确定为什么会这样?

相关代码链接:

最佳答案

简单地从 <-chan Delivery 中取出一个对象, 不要使用 range循环,但是 channel operator <- :

data := <- msgs
return data.Body, nil

至于为什么您一获取一条消息就清空整个队列:这很可能是由于 Consumer prefetch .在使用消息时,客户端实际上不会一个接一个地从代理中弹出它们,而是以可配置大小的批处理弹出(如果我没记错的话,默认情况下约为 32 或 64 条消息)。一旦经纪人向您的消费者发布了这批消息,它们就会在您的 msgs 中。 channel ;如果您在收到第一条消息后不再从该 channel 阅读,则其余消息将消失(至少在启用 auto-ack 的情况下——否则,它们将在 channel 关闭后重新排队)。

要一次只获取一条消息,请使用 channel 的 QoS function (第一个参数是预取计数):

err := v.Channel.Qos(1, 0, false)

关于go - 更正 Go/RabbitMQ 方法以 "pop"队列中的一条消息?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53678038/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com