gpt4 book ai didi

java - QueueClient 中长时间不活动且频繁出现 MessageLockLostException

转载 作者:行者123 更新时间:2023-12-03 03:59:31 26 4
gpt4 key购买 nike

背景

我们有一个使用 Azure 服务总线作为消息代理的数据传输解决方案。我们通过 x 队列从 x 数据集传输数据 - 以 x 专用 QueueClient 作为发送方。有些发件人以每两秒一条消息的速度发布消息,而另一些发件人则每 15 分钟发布一条消息。

数据源端(发送者所在)的应用程序运行良好,为我们提供了所需的吞吐量。

另一方面,我们有一个应用程序,每个队列有一个 QueueClient 接收器,配置如下:

  • maxConcurrentCalls = 1
  • autoComplete = true(如果接收模式 = RECEIVEANDDELETE)和 false(如果接收模式 = >PEEKLOCK) - 我们有一些接收器,如果它们意外关闭,则希望将消息保留在服务总线队列中。
  • maxAutoRenewDuration = 3 分钟(所有队列的锁定持续时间 = 30 秒)
  • 具有单线程的 Executor 服务

向每个接收器注册的 MessageHandler 执行以下操作:

public CompletableFuture<Void> onMessageAsync(final IMessage message) {

// deserialize the message body
final CustomObject customObject = (CustomObject)SerializationUtils.deserialize((byte[])message.getMessageBody().getBinaryData().get(0));

// process processDB1() and processDB2() asynchronously
final List<CompletableFuture<Boolean>> processFutures = new ArrayList<CompletableFuture<Boolean>>();

processFutures.add(processDB1(customObject)); // processDB1() returns Boolean
processFutures.add(processDB2(customObject)); // processDB2() returns Boolean

// join both the completablefutures to get the result Booleans
List<Boolean> results = CompletableFuture.allOf(processFutures.toArray(new CompletableFuture[processFutures.size()])).thenApply(future -> processFutures.stream()
.map(CompletableFuture<Boolean>::join).collect(Collectors.toList())

if (results.contains(false)) {
// dead-letter the message if results contains false
return getQueueClient().deadLetterAsync(message.getLockToken());
} else {
// complete the message otherwise
getQueueClient().completeAsync(message.getLockToken());
}
}

我们测试了以下场景:

场景1 - 接收模式= RECEIVEANDDELETE,消息发布速率:30/分钟

预期行为

应该以恒定的吞吐量连续接收消息(不一定是发布消息的源的吞吐量)。

实际行为

我们观察到 QueueClient 随机、长时间不活动 - 从几分钟到几小时不等 - 服务总线命名空间没有传出消息(在指标图表上观察到)并且没有消耗相同时间段的日志!

场景 2 - 接收模式 = PEEKLOCK,消息发布速率:30/分钟

预期行为

应该以恒定的吞吐量连续接收消息(不一定是发布消息的源的吞吐量)。

实际行为

应用程序运行 20-30 分钟后,我们不断看到 MessageLockLostException

我们尝试执行以下操作 -

  1. 我们将预取计数(从 20 * 处理速率 - 如最佳实践指南中所述)减少到最低限度(在一个测试周期内甚至减少到 0),以减少次数。为客户端锁定的消息数
  2. maxAutoRenewDuration 增加到 5 分钟 - 我们的 processDB1()processDB2() 几乎不会超过一两秒90% 的情况 - 所以,我认为 30 秒的锁定持续时间和 maxAutoRenewDuration 在这里不是问题。
  3. 删除了阻塞的 CompletableFuture.get() 并使处理同步。

这些调整都没有帮助我们解决问题。我们观察到,COMPLETERENEWMESSAGELOCK 抛出 MessageLockLostException

我们需要帮助寻找以下问题的答案:

  1. 为什么场景 1 中 QueueClient 会长时间不活动
  2. 我们如何知道由于锁确实已过期而引发 MessageLockLostException?我们怀疑锁不会过期太快,因为我们的处理会在一两秒内发生。禁用预取也没有为我们解决这个问题。

版本和服务总线详细信息

  • Java - openjdk-11-jre
  • Azure 服务总线命名空间层:标准
  • Java SDK 版本 - 3.4.0

最佳答案

对于场景 1:

如果您有 duplicate detection history启用后,根据以下解释的场景,有可能发生此行为:

enter image description here

我已启用 30 秒。我不断地向服务总线发送重复的消息(在我的例子中,来自客户端的消息具有相同的消息 ID - 30 条/每分钟)。我会看到窗口没有任何 Activity 传出。尽管消息是在服务总线上从发送客户端接收的,但我无法在传出消息中看到它们。您可以检查是否遇到了被过滤的重复消息 - 进而导致传出不活动。

enter image description here

另请注意:创建队列后,您无法启用/禁用重复检测。您只能在创建队列时执行此操作。

关于java - QueueClient 中长时间不活动且频繁出现 MessageLockLostException,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63028151/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com