gpt4 book ai didi

Golang Nats 订阅问题

转载 作者:IT王子 更新时间:2023-10-29 01:44:38 25 4
gpt4 key购买 nike

我目前从事微服务架构方面的工作。在我将 NATS 插入我的项目之前,我想用它测试一些简单的场景。

在一个场景中,我有一个简单的发布者,它通过在 localhost:4222 上运行的基本 Nats 服务器在 for 循环中发布 100.000 条消息。

最大的问题是订阅者。当他收到 30.000 - 40.000 条消息时,我的整个 main.go 程序和所有其他 go 例程就会停止,什么也不做。我可以用 ctrl + c 退出。但发布者仍在继续发送消息。当我打开一个新终端并启动订阅者的一个新实例时,一切都再次运行良好,直到订阅者收到大约 30000 条消息。最糟糕的是,服务器上甚至没有出现一个错误,也没有日志,所以我不知道发生了什么。

之后,我尝试用 QueueSubscribe 方法替换 Subscribe 方法,一切正常。

订阅和队列订阅的主要区别是什么?

NATS-Streaming 是更好的机会吗?或者在哪些情况下我更喜欢 Streaming 而在哪些情况下我更喜欢标准的 NATS-Server

这是我的代码:

发布者:

package main

import (
"fmt"
"log"
"time"

"github.com/nats-io/go-nats"
)

func main() {
go createPublisher()

for {

}
}

func createPublisher() {

log.Println("pub started")

nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
log.Fatal(err)
}
defer nc.Close()

msg := make([]byte, 16)

for i := 0; i < 100000; i++ {
nc.Publish("alenSub", msg)
if (i % 100) == 0 {
fmt.Println("i", i)
}
time.Sleep(time.Millisecond)
}

log.Println("pub finish")

nc.Flush()

}

订阅者:

package main

import (
"fmt"
"log"
"time"

"github.com/nats-io/go-nats"
)

var received int64

func main() {
received = 0

go createSubscriber()
go check()

for {

}
}

func createSubscriber() {

log.Println("sub started")

nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
log.Fatal(err)
}
defer nc.Close()

nc.Subscribe("alenSub", func(msg *nats.Msg) {
received++
})
nc.Flush()

for {

}
}

func check() {
for {
fmt.Println("-----------------------")
fmt.Println("still running")
fmt.Println("received", received)
fmt.Println("-----------------------")
time.Sleep(time.Second * 2)
}
}

最佳答案

无限的 for 循环可能会使垃圾收集器饿死:https://github.com/golang/go/issues/15442#issuecomment-214965471

我仅通过运行发布者就能够重现该问题。要解决这个问题,我建议使用 sync.WaitGroup。以下是我如何更新评论中链接的代码以完成它:

package main

import (
"fmt"
"log"
"sync"
"time"

"github.com/nats-io/go-nats"
)

// create wait group
var wg sync.WaitGroup

func main() {
// add 1 waiter
wg.Add(1)
go createPublisher()

// wait for wait group to complete
wg.Wait()
}

func createPublisher() {

log.Println("pub started")
// mark wait group done after createPublisher completes
defer wg.Done()

nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
log.Fatal(err)
}
defer nc.Close()

msg := make([]byte, 16)

for i := 0; i < 100000; i++ {
if errPub := nc.Publish("alenSub", msg); errPub != nil {
panic(errPub)
}

if (i % 100) == 0 {
fmt.Println("i", i)
}
time.Sleep(time.Millisecond * 1)
}

log.Println("pub finish")

errFlush := nc.Flush()
if errFlush != nil {
panic(errFlush)
}

errLast := nc.LastError()
if errLast != nil {
panic(errLast)
}

}

我建议以类似方式更新上述订阅者代码。

SubscribeQueueSubscriber 之间的主要区别在于,在Subscribe 中,所有订阅者都从中发送所有消息。在 QueueSubscribe 中,每条消息仅发送 QueueGroup 中的一个订阅者。

有关 NATS Streaming 附加功能的一些详细信息,请参见此处: https://nats.io/documentation/streaming/nats-streaming-intro/

我们看到 NATS 和 NATS Streaming 被用于从数据管道到控制平面的各种用例。您的选择应由您的用例需求驱动。

关于Golang Nats 订阅问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45886359/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com