- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
尝试结合 HiveMQ 的两个特性:共享订阅和持久 session 。
如果创建了一个非常简单的消息生成器。和一个非常简单的消费者。当运行多个消费者时,所有消费者都会收到所有消息。
将消费者的 clearSession 设置为 'false' 后,当运行消费者并重启消费者时,消费者在未连接时也会收到消息。太棒了。
现在将其与共享订阅功能相结合。仅使用共享订阅时,clearSession 为“true”。当运行多个消费者时,一条消息只会被一个消费者接收。它应该是循环的,情况也是如此,但是一旦您停止消费者,消息就不再是循环的,但是其中一个消费者收到的消息明显多于其他消费者。
如果我现在再次启用持久 session ,clearSession 为“false”,并启动共享订阅消费者,消费者将再次开始接收所有消息,而不是只将消息传递给一个客户端。
这里有什么问题?这是 HiveMQ 中的错误吗?persistent session 和 shared subscription 不能一起用吗?那真是太可惜了。
2017 年 2 月 15 日更新正如@fraschbi 所建议的那样,我清除了所有数据并再次重新测试了与持久 session 使用者的共享订阅。似乎有效!
但奇怪的是,错过的消息只有在第一个消费者重新连接后才会收到。所有消费者都有相同的代码,他们只是从不同的 clientId 参数开始。请参阅下面的代码。我的测试序列:
所以我的新问题是:为什么只有第一个消费者收到丢失的消息?
注意:这里的技巧仍然是停止客户端时不要取消订阅,因为那样订阅/持久化设置就丢失了!
生产者.scala
object Producer extends App {
val topic = args(0)
val brokerUrl = "tcp://localhost:1883"
val clientId = UUID.randomUUID().toString
val client = new MqttClient(brokerUrl, clientId)
client.connect()
val theTopic = client.getTopic(topic)
var count = 0
sys.addShutdownHook {
println("Disconnecting client...")
client.disconnect()
println("Disconnected.")
}
while(true) {
val msg = new MqttMessage(s"Message: $count".getBytes())
theTopic.publish(msg)
println(s"Published: $msg")
Thread.sleep(1000)
count = count + 1
}
}
消费者.scala
object Consumer extends App {
val topic = args(0)
val brokerUrl = "tcp://localhost:1883"
val clientId = args(1)
// val clientId = UUID.randomUUID().toString
val client = new MqttClient(brokerUrl, clientId)
client.setCallback(new MqttCallback {
override def deliveryComplete(token: IMqttDeliveryToken) = ()
override def messageArrived(topic: String, message: MqttMessage) = println(s"received on topic '$topic': ${new String(message.getPayload)}")
override def connectionLost(cause: Throwable) = println("Connection lost")
})
println(s"Start $clientId consuming from topic: $topic")
val options = new MqttConnectOptions()
options.setCleanSession(false);
client.connect(options)
client.subscribe(topic)
sys.addShutdownHook {
println("Disconnecting client...")
// client.unsubscribe(topic)
client.disconnect()
println("Disconnected.")
}
while(true) {
}
}
最佳答案
我将尝试分别回答您遇到的两个问题。
It should be round-robin and that is also the case, but as soon as you stop a consumer the messages a no longer round-robin but one of the consumers gets significantly more messages then the other(s).
在为共享订阅分发消息时,HiveMQ 确实更喜欢在线客户端。
If I now enable persistent session again, clearSession is 'false', and start the shared subscription consumers, the consumers start to receive all messages again instead of the message is just delivered to one client.
在问题的开头,您说您正在将具有 cleanSession=false
的客户端连接到代理并订阅主题。 (听起来好像您只使用了一个主题。)在重新连接 cleanSession=false
和共享订阅之前,您是否有可能不取消订阅这些客户端?在这种情况下,您场景第一步中的订阅仍会保留给这些客户端,并且自然而然地,他们每个人都会收到消息。
编辑:
So my new question is: why does only the 1st consumer receive the lost messages?
来自 HiveMQ 用户指南:
When a clients offline queue is full, the message for that client won’t be dropped but queued for the next offline client in a shared subscription group.
当所有客户端都离线时,分发不再是轮询。因此,您描述的场景在预期行为范围内。
消息队列的默认值为 1000。因此您可以在客户端离线时发送超过 1000 条消息,或者减小消息队列的大小。
...
<persistence>
<queued-messages>
<max-queued-messages>50</max-queued-messages>
</queued-messages>
...
</persistence>
...
将此添加到您的 config.xml
以减小消息队列大小。
关于具有持久 session 的 HiveMQ 共享订阅,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42231122/
我是Hibernate的新手。当我保存特定实体时,它将从现有实体中重写数据。 我将ID用作自动生成,如下所示: @Id @GeneratedValue(strategy=GenerationType.
我正在尝试以连续模式使用CouchDB更改通知API,所以我想发送此消息 _changes?feed = continuous?include_docs = true作为GET请求到达我的CouchD
我有 XMPP 服务器(openfire)和一堆客户端(spark),分为几个组(部门)。我正在寻找能够将它们留在 session 室中的能力。我的意思是 Skype 具有的类似功能;当用户关闭带有群
我发布这个问题是为了看看我是否正确理解 Azure Functions 中的并行性,特别是 Durable Functions。 最近使用 az cli 在 Azure Functions 中添加了设
我在 Dev Env 上有一个 AKS 集群,上面运行着一些容器。我还启用了 Azure Log Analytics。但我可以看到正在运行的当前容器的日志,而不是已被终止或停止的旧容器的日志。 我想知
在 Akka 中,当一个 actor 在处理消息时死亡(在 onReceive(...) { ... } 内),该消息就会丢失。有没有办法保证无损?有一种配置 Akka 在将消息发送到 onRecei
我试图让 selectOneMany 取得有限的成功。 我有以下数据库模型 User email Text verkey Text Maybe verified Bool password T
我需要使用持久性(Yesod)从键列表中获取实体列表 假设我有一个 Model 及其相应的 ModelId。我身边有: keys :: [ModelId] 我需要得到 models :: [Model
我有一个使用 GWT、请求工厂和地点/Activity 构建的网络应用程序。我很好奇我使用的历史 token 是否持久。该任务基本上就是让 URL 定义我的网络应用程序的确切位置(读作“文件/文件夹结
我正在寻找一种 jQuery 方法来在刷新页面时使页面元素持久保留在用户屏幕上。当我刷新页面并且丢失 jQuery 页面中的内容时,它会发生变化。 我需要页面持久。如何刷新页面并保持元素不刷新(持久)
当我尝试使用 gcc 编译带有 -fopenmp 标志的 C 代码时,我已经持续收到此错误超过 6 小时了。 错误:控制谓词无效 for ( int i = 0; i #include #ifde
我有带有验证注释的实体,例如@NotNull。我不知道如何防止容器管理的事务在批量持久操作中出现 ConstraintViolationException 的情况下回滚,例如: public void
这是我的代码: http://jsfiddle.net/KCb5z/8/embedded/result/ http://jsfiddle.net/KCb5z/8/ $(function () {
我正在与服务器通信,理想情况下,我希望输入流和输出流始终处于运行状态。我收到未经请求的响应,因此我必须始终准备好接收输入流上的数据。 在我进一步深入之前,我应该说我建立的任何连接都必须能够支持 SSL
我正在寻找一种正确扩展 Azure Functions 的方法,但遇到了问题。 我有一组 IoT 设备,通过 HTTP 向 Azure 发送数据(为此,有一组自动扩展的 Azure Functions
1.临时态(瞬时态) 不存在于session中,也不存在于数据库中的数据,被称为临时态。 比如:刚刚使用new关键字创建出的对象。 2.持久态 存在于session中,事务还未提交,提交之后
我在 Kohana v2 中使用数据库 session 驱动程序。为了使 session 持久化,Kohana 创建了一个 token cookie。这个 cookie 使用了我想的 cookie 配
有谁知道是否有办法使用 PyWinrm 打开一个持久的 PowerShell session ,该 session 保持状态并且可以多次调用?我正在尝试执行以下操作: #!/bin/python im
在运行的Elasticsearch集群中,配置文件中的index.number_of_replicas设置为1。 我可以通过运行以下命令在运行的集群上将其更新为2 # curl -XPUT "http
我在“这么长的帖子必须意味着大量的代码和配置”部分下一对一地使用指南代码。 http://blog.springsource.com/2006/08/07/using-jpa-in-spring-wi
我是一名优秀的程序员,十分优秀!