gpt4 book ai didi

activemq - 如何让我的 ActiveMQ 代理删除离线持久订阅者

转载 作者:行者123 更新时间:2023-12-02 22:08:23 29 4
gpt4 key购买 nike

我们有一个 ActiveMQ 代理,它使用 JMS、AMQP 和 MQTT 连接到不同的客户端。由于某种原因,我们还没有弄清楚一组特定的 MQTT 客户端经常(并非总是)持久订阅。这是一个测试环境,其中经常添加和删除客户端,后者有时是通过拔掉插头或重新启动嵌入式设备来实现的,以便它们无法正确取消订阅。效果(IIUC)是经纪人为可能永远不会再看到的设备堆积“离线持久订阅”(我可以在 http://my_broker:8161/admin/subscribers.jsp 下看到这些),永远保留有关这些主题的消息,直到它最终在自己的情况下崩溃内存占用。

这里的问题是订阅者会持续订阅,我们需要找出原因。然而,我们还决定,客户这样做(无意中)不应该使经纪商陷入停顿,因此我们需要独立解决这个问题。

我找到了there are settings for a timeout for offline durable subscriptions并将它们放入我们的代理配置中(最后两行):

<broker
xmlns="http://activemq.apache.org/schema/core"
brokerName="my_broker"
dataDirectory="${activemq.data}"
useJmx="true"
advisorySupport="false"
persistent="false"
offlineDurableSubscriberTimeout="1800000"
offlineDurableSubscriberTaskSchedule="60000">

如果我理解正确的话,上面应该每分钟检查一次并解雇半小时没有见过的客户。然而,与文档相反,这似乎不起作用:我订阅的消费者在几天前拔掉插头仍然在离线持久订阅者列表中可见,代理的内存占用量不断增加,如果我在经纪人的网络界面中手动删除订阅者我可以看到内存占用量下降。

这是我的问题:

  1. 什么决定了 ActiveMQ 代理上主题的 MQTT 订阅是否持久?
  2. 我在 ActiveMQ 设置中设置删除持久脱机订阅的超时时做错了什么?

最佳答案

我提取了删除超时持久订阅的相关代码 (doCleanup())。

成功的情况下,执行:

    LOG.info("Destroying durable subscriber due to inactivity: {}", sub);

在失败的情况下,它执行:

    LOG.error("Failed to remove inactive durable subscriber", e);

在日志文件中查找上述日志行,并将其与您使用 admin/subscribers.jsp 查看器观察到的详细信息进行匹配。如果它不打印任何行,则订阅可能由于某种原因保持事件状态,或者您可能遇到了错误。

另外,如果可以的话,您可以尝试删除经纪人名称中的下划线(_)吗?该手册讨论了经纪人名称中下划线的问题。

代码:

public TopicRegion(RegionBroker broker, DestinationStatistics destinationStatistics, SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
super(broker, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
if (broker.getBrokerService().getOfflineDurableSubscriberTaskSchedule() != -1 && broker.getBrokerService().getOfflineDurableSubscriberTimeout() != -1) {
this.cleanupTimer = new Timer("ActiveMQ Durable Subscriber Cleanup Timer", true);
this.cleanupTask = new TimerTask() {
@Override
public void run() {
doCleanup();
}
};
this.cleanupTimer.schedule(cleanupTask, broker.getBrokerService().getOfflineDurableSubscriberTaskSchedule(),broker.getBrokerService().getOfflineDurableSubscriberTaskSchedule());
}
}

public void doCleanup() {
long now = System.currentTimeMillis();
for (Map.Entry<SubscriptionKey, DurableTopicSubscription> entry : durableSubscriptions.entrySet()) {
DurableTopicSubscription sub = entry.getValue();
if (!sub.isActive()) {
long offline = sub.getOfflineTimestamp();
if (offline != -1 && now - offline >= broker.getBrokerService().getOfflineDurableSubscriberTimeout()) {
LOG.info("Destroying durable subscriber due to inactivity: {}", sub);
try {
RemoveSubscriptionInfo info = new RemoveSubscriptionInfo();
info.setClientId(entry.getKey().getClientId());
info.setSubscriptionName(entry.getKey().getSubscriptionName());
ConnectionContext context = new ConnectionContext();
context.setBroker(broker);
context.setClientId(entry.getKey().getClientId());
removeSubscription(context, info);
} catch (Exception e) {
LOG.error("Failed to remove inactive durable subscriber", e);
}
}
}
}
}

// The toString method for DurableTopicSubscription class
@Override
public synchronized String toString() {
return "DurableTopicSubscription-" + getSubscriptionKey() + ", id=" + info.getConsumerId() + ", active=" + isActive() + ", destinations=" + durableDestinations.size() + ", total=" + getSubscriptionStatistics().getEnqueues().getCount() + ", pending=" + getPendingQueueSize() + ", dispatched=" + getSubscriptionStatistics().getDispatched().getCount() + ", inflight=" + dispatched.size() + ", prefetchExtension=" + getPrefetchExtension();
}

关于activemq - 如何让我的 ActiveMQ 代理删除离线持久订阅者,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40529562/

29 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com