gpt4 book ai didi

c# - Durable Orchestrator 在 Task.WhenAll 之后卡住

转载 作者:行者123 更新时间:2023-12-03 01:19:31 26 4
gpt4 key购买 nike

我有一个有 4 个事件的协调器:

  1. PromoDataFromActpm - 下载一些数据的单个事件来自 API。
  2. PromoDataExport - 从事件 #1 发送数据的单个事件到 Azure 存储
  3. SavePromoProductFromACPSActivity - 并行事件,用于事件 #1 中的每个项目都会调用一些 API 并下载一些数据
  4. TableToBlobPromoProductActivity - 写入的并行事件从事件 #3 到 Blob 存储的项目

对于事件 #3,集合中的每个项目都是 1 个事件调用,对于事件 #4,它会在每个事件调用 50 个项目的集合中进行批处理,这些项目正在由 Task.WhenAll 等待。

在本地环境中一切正常,但在 azure 上,orchestrator 在事件 #3 Task.WhenAll 之后由于某种原因停止处理。我在日志中收到许多对 SavePromoProductFromACPSActivity 的请求,这是应该的,但一段时间后它们会停止,并且 TableToBlobPromoProductActivity 事件永远不会被调用。我只是偶尔收到“正在执行 XYZ 协调器”,几分钟后“已执行 XYZ 协调器”,这些消息之间没有事件调用。

我已经和它斗争了一段时间,但没有成功。有什么想法吗?

这是代码:

            var orchestrationId = context.InstanceId.Replace(":","");

var promoData = await context.CallActivityAsync<PromotionExportModel[]>(FunctionNamesExport.Activity.PromoDataFromActpm, null);
var exportResult = await context.CallActivityAsync<OperationResponse>(FunctionNamesExport.Activity.PromoDataExport, promoData);

var acpsTasks = new List<Task<List<PromotedProductExportModel>>>();
var acpsPromos = new List<PromotedProductExportModel>();
foreach (var promo in promoData)
{
acpsTasks.Add(context.CallActivityAsync<List<PromotedProductExportModel>>(FunctionNamesExport.Activity.SavePromoProductFromACPSActivity, promo));
}
await Task.WhenAll(acpsTasks);
acpsTasks.ForEach(x => acpsPromos.AddRange(x.Result));

var promoDataBatched = acpsPromos.Batch(50);
var tasks = new List<Task>();
foreach(var arr in promoDataBatched)
{
var promoBlob = new PromotionExportSubModel
{
PromotionExportModel = arr.ToArray(),
blockId = Convert.ToBase64String(Guid.NewGuid().ToByteArray()),
orchestrationId = orchestrationId
};
tasks.Add(context.CallActivityAsync(FunctionNamesExport.Activity.TableToBlobPromoProductActivity, promoBlob));
}
await Task.WhenAll(tasks);

最佳答案

由于您在评论中提到您将在第 3 步中并行启动 3000 个事件,因此我知道可能会发生什么。请记住,每个事件都会将行添加到历史表中,所有这些行都需要在协调器的每次重播时加载(在每个事件返回后发生)。因此加载时间会不断增加,内存使用量也会不断增加。

我为此使用的一个典型解决方案是将工作拆分为子编排器。例如,将数据分成 100 个批处理,为每个批处理启动一个子编排器以并行运行。然后在该子协调器中执行实际的第 3 步事件。这样,子编排器实例的历史表行的上限为 100 个事件所需的行,而主编排器仅获得约 30 个结果。您可以对第 4 步执行类似的操作。

关于c# - Durable Orchestrator 在 Task.WhenAll 之后卡住,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/72175369/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com