gpt4 book ai didi

c# - 具有事务的长时间运行作业 Azure 服务总线的模式

转载 作者:行者123 更新时间:2023-11-30 16:52:58 25 4
gpt4 key购买 nike

我想将事务与 Azure 服务总线一起使用,但我有一些与网络/API 绑定(bind)的消息,我根本无法可靠地进一步重构它们以使其在不到 5 分钟(允许的最大 PeekLock 持续时间)内完成。

我找不到任何允许我扩展锁的 API,所以也许还有另一种模式。

一种可能的解决方案:

1) 使用现有的实现来接收消息。如果从需要长时间运行事务的主题/队列中获取 - 使用新的 ScheduledEnqueueTimeUtc 更新消息并将其发送回服务总线。

myMessage.ScheduledEnqueueTimeUtc = TimeSpan.FromMinutes(actualLockDuration);
serviceBusClient.PublishMessage(topic, myMessage);

2)通过MessageId获取特定消息并将该新消息标记为完成。

if (oldMessage.LockedUntilUtc > DateTime.UtcNow) {
var message = FetchMessage(oldMessage.MessageId);
message.Complete();
} else {
oldMessage.Complete();
}
<小时/>

再想一想,在寻找通过 messageId 获取消息的 API 后,我没有看到。如果我通过序列 Id 获取,那么我需要一种在步骤 1 之后获取序列 Id 的方法 - 然后我需要重新考虑许多内部系统(大型消息处理、消息记录和关联等)

最佳答案

我不知道我是怎么错过这个的。 BrokeredMessage.RenewLock

我在它周围写了一个小的异步包装器来更新直到最大持续时间。

public static async Task<ProcessMessageReturn> RenewLockAfter(this Task<ProcessMessageReturn> processTask, BrokeredMessage message, int maxDuration)
{
var ss = new SemaphoreSlim(2);
var startTime = DateTime.UtcNow;
var trackedTasks = new List<Task> {processTask};
var timeoutCancellationTokenSource = new CancellationTokenSource();

while (true)
{
ss.Wait(timeoutCancellationTokenSource.Token);

if (startTime.AddMinutes(maxDuration) < DateTime.UtcNow)
{
var task = Task.Run(async () =>
{
await Task.Delay(TimeSpan.FromTicks(message.LockedUntilUtc.Ticks - DateTime.UtcNow.AddSeconds(30).Ticks), timeoutCancellationTokenSource.Token);
await message.RenewLockAsync();
ss.Release();
}, timeoutCancellationTokenSource.Token);
trackedTasks.Add(task);
}


var completedTask = await Task.WhenAny(trackedTasks);
if (completedTask != processTask) continue;

timeoutCancellationTokenSource.Cancel();
return processTask.Result;
}

}

关于c# - 具有事务的长时间运行作业 Azure 服务总线的模式,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31930986/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com