gpt4 book ai didi

c# - 未续订 Azure 服务总线消息锁?

转载 作者:行者123 更新时间:2023-12-05 07:26:18 25 4
gpt4 key购买 nike

我构建了一个服务来支持 Azure 服务总线中的多个队列订阅,但我遇到了一些奇怪的行为。

我的订阅单例类有一个如下所示的方法:

    public void Subscribe<TMessage>(Func<TMessage, Task> execution, int maxDop = 1, int ttl = 60) where TMessage : IServiceBusMessage
{
try
{
var messageLifespan = TimeSpan.FromSeconds(ttl);
var messageType = typeof(TMessage);
if (!_activeSubscriptionClients.TryGetValue(messageType, out var subscriptionClient))
{
subscriptionClient = _subscriptionClientFactory.Create(typeof(TMessage)).GetAwaiter().GetResult();
if (subscriptionClient.OperationTimeout < messageLifespan) subscriptionClient.OperationTimeout = messageLifespan;
if (subscriptionClient.ServiceBusConnection.OperationTimeout < messageLifespan)
subscriptionClient.ServiceBusConnection.OperationTimeout = messageLifespan;
_activeSubscriptionClients.AddOrUpdate(messageType, subscriptionClient, (key, value) => value);
}

var messageHandlerOptions = new MessageHandlerOptions(OnException)
{
MaxConcurrentCalls = maxDop,
AutoComplete = false,
MaxAutoRenewDuration = messageLifespan,
};


subscriptionClient.RegisterMessageHandler(
async (azureMessage, cancellationToken) =>
{
try
{
var textPayload = _encoding.GetString(azureMessage.Body);
var message = JsonConvert.DeserializeObject<TMessage>(textPayload);
if (message == null)
throw new FormatException($"Cannot deserialize the message payload to type '{typeof(TMessage).FullName}'.");
await execution.Invoke(message);
await subscriptionClient.CompleteAsync(azureMessage.SystemProperties.LockToken);
}
catch (Exception ex)
{
_logger.LogError(ex, "ProcessMessagesAsync(Message, CancellationToken)");
await subscriptionClient.AbandonAsync(azureMessage.SystemProperties.LockToken);
}
}
, messageHandlerOptions);
}
catch (Exception ex)
{
_logger.LogError(ex, "Subscribe(Action<TMessage>)");
throw;
}
}

想法是,您为特定类型的消息订阅 Azure 服务总线,它直接对应于一个队列。在您的订阅中,您传递了一个委托(delegate)来决定如何处理消息。

这似乎有效……但有一个警告。

无论我为 MaxAutoRenewDurationOperationTimeout 设置什么 ttl,在任何给定消息的长期运行过程中,一分钟后,消息从队列中解锁,另一个订阅者拿起它并开始处理它。

我的理解是,这正是 MaxAutoRenewDuration 应该阻止的……但它似乎并没有阻止任何事情。

谁能告诉我我需要采取哪些不同的措施来确保消费者在整个过程中拥有消息的所有权?

最佳答案

我可以想到一些您可能想要查看的选项。

  1. 不要在 SubscriptionClient 中使用默认的 ReceiveMode = PeekLock,而是将其设置为 ReceiveAndDelete,这样一旦消息被消费,它就会从队列中移除,并且不会被消费任何其他客户端,这确实意味着您必须优雅地处理异常并自行重试;

  2. 查看 OperationTimeout,根据 doco,它是 单个操作超时的持续时间

关于c# - 未续订 Azure 服务总线消息锁?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54429040/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com