gpt4 book ai didi

go - 蚊子的持久性不一致

转载 作者:行者123 更新时间:2023-12-03 10:10:16 26 4
gpt4 key购买 nike

我在mosquitto上看到消息持久性和qos = 2的消息传递不一致。我做错了什么吗?
我有一个简单的测试应用程序,可以使用clientId =“receive-client”注册要消费的主题,但是会立即断开连接。然后,它将作为clientId =“send-client”连接并发布10条消息,即“消息#1” ...“消息#10”。然后断开连接,等待五秒钟,然后在打印和计数收到的消息时再次与“receive-client”连接以消耗。
结果不一致。有时我会收到6条消息,有时会收到8条消息。典型的输出如下所示:

WARN[0005] GOT A MESSAGE:message #1                     
WARN[0005] GOT A MESSAGE:message #2
WARN[0005] GOT A MESSAGE:message #3
WARN[0005] GOT A MESSAGE:message #4
WARN[0005] GOT A MESSAGE:message #5
WARN[0005] GOT A MESSAGE:message #6
WARN[0005] GOT A MESSAGE:message #7
WARN[0005] GOT A MESSAGE:message #8
WARN[0305] PAUSE
WARN[0605] received message count=8
我的版本信息为1.4.15。我的mosquitto.conf是:
pid_file /var/run/mosquitto.pid

persistence true
persistence_location /var/lib/mosquitto/

allow_anonymous false
password_file /etc/mosquitto/passwd

log_dest file /var/log/mosquitto/mosquitto.log
最初,/var/lib/mosquitto/mosquitto.db在运行几次迭代后才会显示。我的测试应用程序在这里:
import (
mqtt "github.com/eclipse/paho.mqtt.golang"
log "github.com/sirupsen/logrus"
"time"
)

var receivedMsg int

func Persist() {
const TOPIC = "test"
const URL = "tcp://localhost:1883"
const USERNAME = "myuser"
const PASSWORD = "mypassword"

defer printReceived()

options := mqtt.NewClientOptions().AddBroker(URL).SetUsername(USERNAME).SetPassword(PASSWORD)
options.SetCleanSession(false)
options.SetConnectRetry(true)
options.SetConnectRetryInterval(10 * time.Millisecond)

// register the receive client with broker / TOPIC
// to be sure the broker knows it needs to save our messages
// to deliver at a later time
options.SetClientID("receive-client")
client := mqtt.NewClient(options)
token := client.Connect()
token.Wait()
if token := client.Subscribe(TOPIC, 2, consume1); token.Wait() && token.Error() != nil {
panic(token.Error())
}
client.Disconnect(0)

// connect with send client and send 10 messages
options.SetClientID("send-client")
client = mqtt.NewClient(options)
token = client.Connect()
token.Wait()

client.Publish(TOPIC, 2, false, "message #1")
client.Publish(TOPIC, 2, false, "message #2")
client.Publish(TOPIC, 2, false, "message #3")
client.Publish(TOPIC, 2, false, "message #4")
client.Publish(TOPIC, 2, false, "message #5")
client.Publish(TOPIC, 2, false, "message #6")
client.Publish(TOPIC, 2, false, "message #7")
client.Publish(TOPIC, 2, false, "message #8")
client.Publish(TOPIC, 2, false, "message #9")
client.Publish(TOPIC, 2, false, "message #10")
client.Disconnect(4)
time.Sleep(5* time.Second)

// subscribe again and try to retrieve the messages we missed
options.SetClientID("receive-client")
client = mqtt.NewClient(options)
token = client.Connect()
token.Wait()

if token := client.Subscribe(TOPIC, 2, consume2); token.Wait() && token.Error() != nil {
panic(token.Error())
}

time.Sleep(300 * time.Second)
log.Warn("PAUSE")
time.Sleep(300 * time.Second)
}

func consume1(client mqtt.Client, msg mqtt.Message) {
receivedMsg++
log.Warn("THIS SHOULD NOT BE CONSUMING ANY MESSAGES:", string(msg.Payload()))
}

func consume2(client mqtt.Client, msg mqtt.Message) {
receivedMsg++
log.Warn("GOT A MESSAGE:", string(msg.Payload()))
}

func printReceived() {
log.Warn("received message count=", receivedMsg)
}

最佳答案

要在QOS 2上进行发布是一个多步骤过程,因此最可能的原因是您在所有消息实际完成向代理的发布之前就断开了发布客户端的连接。
您可能应该进行循环发布,并使用对client.publish()的调用中返回的 token 来等待它完成,然后再断开与客户端的连接。
例如如示例所示:

//Publish 5 messages to /go-mqtt/sample at qos 1 and wait for the receipt
//from the server after sending each message
for i := 0; i < 5; i++ {
text := fmt.Sprintf("this is msg #%d!", i)
token := c.Publish("go-mqtt/sample", 0, false, text)
token.Wait()
}

关于go - 蚊子的持久性不一致,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64576257/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com