gpt4 book ai didi

go - RabbitMQ 消费者性能 - 预取与并发

转载 作者:数据小太阳 更新时间:2023-10-29 03:18:34 24 4
gpt4 key购买 nike

我有一个 Go 应用程序处理来自单个 RabbitMQ 队列的事件。我用 github.com/streadway/amqp RabbitMQ 客户端库。

Go 应用程序在大约 2-3 秒内处理每条消息。如果我从内存中输入消息,则可以并行处理 ~1000 条甚至更多消息。但是,不幸的是,RabbitMQ 的性能更差。所以,我想更快地使用队列中的消息。

因此,问题是:如何使用 github.com/streadway/amqp 以最有效的方式使用消息?

据我了解,有两种方法:

  1. 设置高预取

    https://godoc.org/github.com/streadway/amqp#Channel.Qos .

    使用单个消费者协程

    示例代码:

conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()

ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()

ch.Qos(
10000, // prefetch count
0, // prefetch size
false, // global
)

msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
false, // NO auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)

for d := range msgs {
log.Printf("Received a message: %s", d.Body)
err:= processMessage(d)
if err != nil {
log.Printf("%s : while consuming task", err)
d.Nack(false, true)
} else {
d.Ack(false)
}
continue // consume other messages
}

但是 processMessage 会在这里并行调用吗?

  1. 产生许多 channel 并使用多个消费者
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
var i = 0
for i = 0; i<=100; i++ {
go func(){
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()

ch.Qos(
10, // prefetch count
0, // prefetch size
false, // global
)

msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
false, // NO auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)

for d := range msgs {
log.Printf("Received a message: %s", d.Body)
err:= processMessage(d)
if err != nil {
log.Printf("%s : while consuming task", err)
d.Nack(false, true)
} else {
d.Ack(false)
}
continue // consume other messages
}
}()
}

但这是对 RAM 友好的方法吗?对 RabbitMQ 而言,为每个工作人员生成一个新 channel 不是很引人注目吗?

那么,问题是,哪种变体更好?更好的性能、更好的内存使用等。

那么,在这里 RabbitMQ 的最佳用法是什么?

更新:目前,我遇到了一个情况,我的工作人员在 VPS 上消耗了所有 RAM,并被 OOM 杀死。我使用了第二种方法。因此,对我来说更好的是能够让我的工作人员在工作几分钟后不会被 OOM 杀死。

更新2:当worker处理消息失败时nack,当worker处理消息时ack非常重要。所有消息都必须处理(它的客户分析),但有时工作人员无法处理它,因此它必须 nack 消息将其传递给其他工作人员(目前,一些第 3 方 api 有时用于处理消息简单地返回 503 状态代码,在这种情况下,消息应该传递给其他工作人员或重试)。因此,很遗憾,使用auto-ack 不是一种选择。

最佳答案

我想每个 processMessage() 都在一个新的 goroutine 中运行。

Which variant is better?

我更喜欢第一个,因为打开/关闭 channel 有点昂贵(2 + 2 TCP 数据包)。我认为你的 OOM 问题与太多的 gorutine 无关,gorutine 很轻,大约 5KB。所以问题可能是由您的 processMessage() 引起的。

我认为 github.com/streadway/amqp channel 消费操作是 thread/gorutine-safe ,所以如果你只是做一些消费操作,那么在 goruntine 之间共享 channel 是安全的。

关于go - RabbitMQ 消费者性能 - 预取与并发,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57375790/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com