gpt4 book ai didi

具有持久 session 的 HiveMQ 共享订阅

转载 作者:行者123 更新时间:2023-12-01 13:38:33 24 4
gpt4 key购买 nike

尝试结合 HiveMQ 的两个特性:共享订阅和持久 session 。

如果创建了一个非常简单的消息生成器。和一个非常简单的消费者。当运行多个消费者时,所有消费者都会收到所有消息。

将消费者的 clearSession 设置为 'false' 后,当运行消费者并重启消费者时,消费者在未连接时也会收到消息。太棒了。

现在将其与共享订阅功能相结合。仅使用共享订阅时,clearSession 为“true”。当运行多个消费者时,一条消息只会被一个消费者接收。它应该是循环的,情况也是如此,但是一旦您停止消费者,消息就不再是循环的,但是其中一个消费者收到的消息明显多于其他消费者。

如果我现在再次启用持久 session ,clearSession 为“false”,并启动共享订阅消费者,消费者将再次开始接收所有消息,而不是只将消息传递给一个客户端。

这里有什么问题?这是 HiveMQ 中的错误吗?persistent session 和 shared subscription 不能一起用吗?那真是太可惜了。

2017 年 2 月 15 日更新正如@fraschbi 所建议的那样,我清除了所有数据并再次重新测试了与持久 session 使用者的共享订阅。似乎有效!

但奇怪的是,错过的消息只有在第一个消费者重新连接后才会收到。所有消费者都有相同的代码,他们只是从不同的 clientId 参数开始。请参阅下面的代码。我的测试序列:

  • 启动消费者 1:所有消息都发送给这个消费者。
  • 启动消费者 2:每个消费者接收每隔一条消息。
  • 启动消费者 3:每个消费者收到三分之一的消息。
  • 停止 consumer1:现在 consumer2 和 3 接收每隔一条消息。 (不知道为什么我昨天看到这种分布不均的情况,但可能正如@fraschbi 提到的那样,因为我正在重复使用 clientId,并且没有取消订阅或正确断开连接)
  • 现在正在停止 consumer2:consumer3 现在收到的所有消息。
  • 停止消费者 3:不再收到任何消息。
  • 重启消费者 3:它继续生产者发送的第一条消息。 它不接收丢失的消息
  • 重启消费者2:消息再次均匀分布。
  • 重启消费者 1:这个消费者现在接收所有丢失的消息,然后继续接收每 3 条消息中的 1 条消息。

所以我的新问题是:为什么只有第一个消费者收到丢失的消息?

注意:这里的技巧仍然是停止客户端时不要取消订阅,因为那样订阅/持久化设置就丢失了!

生产者.scala

object Producer extends App {

val topic = args(0)
val brokerUrl = "tcp://localhost:1883"

val clientId = UUID.randomUUID().toString

val client = new MqttClient(brokerUrl, clientId)
client.connect()
val theTopic = client.getTopic(topic)

var count = 0

sys.addShutdownHook {
println("Disconnecting client...")
client.disconnect()
println("Disconnected.")
}

while(true) {
val msg = new MqttMessage(s"Message: $count".getBytes())
theTopic.publish(msg)
println(s"Published: $msg")

Thread.sleep(1000)

count = count + 1
}
}

消费者.scala

object Consumer extends App {

val topic = args(0)
val brokerUrl = "tcp://localhost:1883"

val clientId = args(1)
// val clientId = UUID.randomUUID().toString

val client = new MqttClient(brokerUrl, clientId)
client.setCallback(new MqttCallback {
override def deliveryComplete(token: IMqttDeliveryToken) = ()

override def messageArrived(topic: String, message: MqttMessage) = println(s"received on topic '$topic': ${new String(message.getPayload)}")

override def connectionLost(cause: Throwable) = println("Connection lost")
})

println(s"Start $clientId consuming from topic: $topic")
val options = new MqttConnectOptions()
options.setCleanSession(false);

client.connect(options)
client.subscribe(topic)

sys.addShutdownHook {
println("Disconnecting client...")
// client.unsubscribe(topic)
client.disconnect()
println("Disconnected.")
}


while(true) {

}

}

最佳答案

我将尝试分别回答您遇到的两个问题。

It should be round-robin and that is also the case, but as soon as you stop a consumer the messages a no longer round-robin but one of the consumers gets significantly more messages then the other(s).

在为共享订阅分发消息时,HiveMQ 确实更喜欢在线客户端。

If I now enable persistent session again, clearSession is 'false', and start the shared subscription consumers, the consumers start to receive all messages again instead of the message is just delivered to one client.

在问题的开头,您说您正在将具有 cleanSession=false 的客户端连接到代理并订阅主题。 (听起来好像您只使用了一个主题。)在重新连接 cleanSession=false 和共享订阅之前,您是否有可能不取消订阅这些客户端?在这种情况下,您场景第一步中的订阅仍会保留给这些客户端,并且自然而然地,他们每个人都会收到消息。

编辑:

So my new question is: why does only the 1st consumer receive the lost messages?

来自 HiveMQ 用户指南:

When a clients offline queue is full, the message for that client won’t be dropped but queued for the next offline client in a shared subscription group.

当所有客户端都离线时,分发不再是轮询。因此,您描述的场景在预期行为范围内。

消息队列的默认值为 1000。因此您可以在客户端离线时发送超过 1000 条消息,或者减小消息队列的大小。

...
<persistence>
<queued-messages>

<max-queued-messages>50</max-queued-messages>

</queued-messages>
...
</persistence>
...

将此添加到您的 config.xml 以减小消息队列大小。

关于具有持久 session 的 HiveMQ 共享订阅,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42231122/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com