gpt4 book ai didi

go - ZMQ 无法接收来自多个发布者的消息

转载 作者:数据小太阳 更新时间:2023-10-29 03:21:01 28 4
gpt4 key购买 nike

我正在实现 ZMQ 的 Espresso 模式。

我想连接很多订阅者<>代理<>很多发布者

但是,代理中的监听器只接收来自一个发布者的消息。因此,订阅者只能从那个特定的发布者那里接收。我无法弄清楚我的代码有什么问题。

package playground

import (
zmq "github.com/pebbe/zmq4"

"fmt"
"math/rand"
"time"
"testing"
)

func subscriber_thread(id int) {
subscriber, _ := zmq.NewSocket(zmq.SUB)
subscriber.Connect("tcp://localhost:6001")
subscriber.SetSubscribe("")
defer subscriber.Close()

for {
msg, err := subscriber.RecvMessage(0)
if err != nil {
panic(err)
}
fmt.Println("subscriber id:", id,"received:", msg)
}
}

func publisher_thread(n int) {
publisher, _ := zmq.NewSocket(zmq.PUB)
publisher.Bind("tcp://*:6000")

for {
s := fmt.Sprintf("%c-%05d", n +'A', rand.Intn(100000))
_, err := publisher.SendMessage(s)
if err != nil {
panic(err)
}
fmt.Println("publisher sent:", s)
time.Sleep(100 * time.Millisecond) // Wait for 1/10th second
}
}

// The listener receives all messages flowing through the proxy, on its
// pipe. In CZMQ, the pipe is a pair of ZMQ_PAIR sockets that connects
// attached child threads. In other languages your mileage may vary:

func listener_thread() {
pipe, _ := zmq.NewSocket(zmq.PAIR)
pipe.Bind("inproc://pipe")

// Print everything that arrives on pipe
for {
msg, err := pipe.RecvMessage(0)
if err != nil {
break // Interrupted
}
fmt.Printf("%q\n", msg)
}
}

func TestZmqEspresso(t *testing.T) {
go publisher_thread(0)
go publisher_thread(1)
go publisher_thread(2)

go subscriber_thread(1)
go subscriber_thread(2)

go listener_thread()

time.Sleep(100 * time.Millisecond)

subscriber, _ := zmq.NewSocket(zmq.XSUB)
subscriber.Connect("tcp://localhost:6000")

publisher, _ := zmq.NewSocket(zmq.XPUB)
publisher.Bind("tcp://*:6001")

listener, _ := zmq.NewSocket(zmq.PAIR)
listener.Connect("inproc://pipe")

zmq.Proxy(subscriber, publisher, listener)

fmt.Println("interrupted")

}

最佳答案

我找到了解决方案。XPUB/XSUB 应该绑定(bind)到套接字PUB 和 SUB worker 应该连接到套接字

下面的工作代码

package playground

import (
zmq "github.com/pebbe/zmq4"

"fmt"
"log"
"math/rand"
"testing"
"time"
)

func subscriber_thread(id int) {
subscriber, err := zmq.NewSocket(zmq.SUB)
if err != nil {
panic(err)
}
err = subscriber.Connect("tcp://localhost:6001")
if err != nil {
panic(err)
}
err = subscriber.SetSubscribe("")
if err != nil {
panic(err)
}
defer subscriber.Close()

for {
msg, err := subscriber.RecvMessage(0)
if err != nil {
panic(err)
}
fmt.Println("subscriber id:", id, "received:", msg)
}
}

func publisher_thread(n int) {
publisher, err := zmq.NewSocket(zmq.PUB)
if err != nil {
panic(err)
}
//err = publisher.Bind("tcp://*:6000")
err = publisher.Connect("tcp://localhost:6000")
if err != nil {
panic(err)
}

for {
s := fmt.Sprintf("%c-%05d", n+'A', rand.Intn(100000))
_, err := publisher.SendMessage(s)
if err != nil {
panic(err)
}
fmt.Println("publisher sent:", s)
time.Sleep(100 * time.Millisecond) // Wait for 1/10th second
}
}

// The listener receives all messages flowing through the proxy, on its
// pipe. In CZMQ, the pipe is a pair of ZMQ_PAIR sockets that connects
// attached child threads. In other languages your mileage may vary:

func listener_thread() {
pipe, _ := zmq.NewSocket(zmq.PAIR)
pipe.Bind("inproc://pipe")

// Print everything that arrives on pipe
for {
msg, err := pipe.RecvMessage(0)
if err != nil {
break // Interrupted
}
fmt.Printf("%q\n", msg)
}
}

func TestZmqEspresso(t *testing.T) {
log.SetFlags(log.LstdFlags | log.Lmicroseconds | log.Lshortfile)

go publisher_thread(0)
go publisher_thread(1)
go publisher_thread(2)

go subscriber_thread(1)
go subscriber_thread(2)

go listener_thread()

time.Sleep(100 * time.Millisecond)

subscriber, err := zmq.NewSocket(zmq.XSUB)
if err != nil {
panic(err)
}
//err = subscriber.Connect("tcp://localhost:6000")
err = subscriber.Bind("tcp://*:6000")
if err != nil {
panic(err)
}

publisher, err := zmq.NewSocket(zmq.XPUB)
if err != nil {
panic(err)
}
err = publisher.Bind("tcp://*:6001")
if err != nil {
panic(err)
}

listener, _ := zmq.NewSocket(zmq.PAIR)
listener.Connect("inproc://pipe")

err = zmq.Proxy(subscriber, publisher, listener)
if err != nil {
panic(err)
}

fmt.Println("interrupted")

}

关于go - ZMQ 无法接收来自多个发布者的消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53973010/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com