gpt4 book ai didi

c# - 仅在持久函数中扇出(并忘记)

转载 作者:行者123 更新时间:2023-11-30 13:53:56 25 4
gpt4 key购买 nike

我有一个现有的函数应用程序,其中包含 2 个函数和一个存储队列。 F1 由服务总线主题中的消息触发。对于收到的每个消息,F1 计算一些必须以不同的延迟量执行的子任务(T1,T2,...)。例如 - T1 在 3 分钟后触发,T2 在 5 分钟后触发,等等。F1 将消息发布到具有适当可见性超时的存储队列(以模拟延迟),只要消息在队列中可见,就会触发 F2。一切正常。

我现在想要迁移此应用程序以使用“耐用功能”。 F1 现在仅启动协调器。协调器代码如下 -

    public static async Task Orchestrator([OrchestrationTrigger] DurableOrchestrationContext context, TraceWriter log)
{
var results = await context.CallActivityAsync<List<TaskInfo>>("CalculateTasks", "someinput");
List<Task> tasks = new List<Task>();
foreach (var value in results)
{
var pnTask = context.CallActivityAsync("PerformSubTask", value);
tasks.Add(pnTask);
}

//dont't await as we want to fire and forget. No fan-in!
//await Task.WhenAll(tasks);
}

[FunctionName("PerformSubTask")]
public async static Task Run([ActivityTrigger]TaskInfo info, TraceWriter log)
{
TimeSpan timeDifference = DateTime.UtcNow - info.Origin.ToUniversalTime();
TimeSpan delay = TimeSpan.FromSeconds(info.DelayInSeconds);
var actualDelay = timeDifference > delay ? TimeSpan.Zero : delay - timeDifference;

//will still keep the activity function running and incur costs??
await Task.Delay(actualDelay);

//perform subtask work after delay!
}

我只想扇出(不扇入来收集结果)并启动子任务。协调器启动所有任务并避免调用“await Task.WhenAll”。事件函数调用“Task.Delay”等待指定的时间,然后执行其工作。

我的问题

  • 在此工作流程中使用 Durable Functions 是否有意义?

  • 这是编排“扇出”工作流程的正确方法吗?

  • 我不喜欢这样的事实:事件函数运行指定的时间(3 或 5 分钟)而不执行任何操作。会产生成本吗?

  • 此外,如果需要延迟超过 10 分钟,则 no way事件函数要通过这种方法取得成功!

  • 我之前尝试避免这种情况的方法是在协调器中使用“CreateTimer”,然后将该事件添加为延续,但我在“历史记录”表中只看到计时器条目。续作不火!我是否违反了 constraint for orchestrator code -“编排器代码绝不能启动任何异步操作”?

     foreach (var value in results)
    {
    //calculate time to start
    var timeToStart = ;
    var pnTask = context.CreateTimer(timeToStart , CancellationToken.None).ContinueWith(t => context.CallActivityAsync("PerformSubTask", value));
    tasks.Add(pnTask);
    }

更新:使用克里斯建议的方法

计算子任务和延迟的事件

    [FunctionName("CalculateTasks")]
public static List<TaskInfo> CalculateTasks([ActivityTrigger]string input,TraceWriter log)
{
//in reality time is obtained by calling an endpoint
DateTime currentTime = DateTime.UtcNow;
return new List<TaskInfo> {
new TaskInfo{ DelayInSeconds = 10, Origin = currentTime },
new TaskInfo{ DelayInSeconds = 20, Origin = currentTime },
new TaskInfo{ DelayInSeconds = 30, Origin = currentTime },
};
}

public static async Task Orchestrator([OrchestrationTrigger] DurableOrchestrationContext context, TraceWriter log)
{
var results = await context.CallActivityAsync<List<TaskInfo>>("CalculateTasks", "someinput");
var currentTime = context.CurrentUtcDateTime;
List<Task> tasks = new List<Task>();
foreach (var value in results)
{
TimeSpan timeDifference = currentTime - value.Origin;
TimeSpan delay = TimeSpan.FromSeconds(value.DelayInSeconds);
var actualDelay = timeDifference > delay ? TimeSpan.Zero : delay - timeDifference;

var timeToStart = currentTime.Add(actualDelay);

Task delayedActivityCall = context
.CreateTimer(timeToStart, CancellationToken.None)
.ContinueWith(t => context.CallActivityAsync("PerformSubtask", value));
tasks.Add(delayedActivityCall);
}

await Task.WhenAll(tasks);
}

简单地从协调器内部调度任务似乎是可行的。在我的例子中,我正在循环之前计算另一个事件(CalculateTasks)中的任务和延迟。我希望使用事件运行时的“当前时间”来计算延迟。我在事件中使用 DateTime.UtcNow 。在编排器中使用时,这在某种程度上表现不佳。 “ContinueWith”指定的事件不会运行,并且协调器始终处于“正在运行”状态。

我可以不使用协调器内的事件记录的时间吗?

更新2

所以克里斯建议的解决方法有效!

由于我不想收集事件的结果,因此我避免在安排所有事件后调用“await Tasks.WhenAll(tasks)”。我这样做是为了减少控制队列上的争用,即能够在需要时启动另一个编排。尽管如此,“协调器”的状态仍然是“正在运行”,直到所有事件完成运行。我猜想只有在最后一个事件向控制队列发布“完成”消息后,它才会移动到“完成”。

我说得对吗?有没有什么办法可以更早地释放协调器,即在安排所有事件之后立即释放?

最佳答案

ContinueWith 方法对我来说效果很好。我能够使用以下编排器代码模拟您的场景的一个版本:

[FunctionName("Orchestrator")]
public static async Task Orchestrator(
[OrchestrationTrigger] DurableOrchestrationContext context,
TraceWriter log)
{
var tasks = new List<Task>(10);
for (int i = 0; i < 10; i++)
{
int j = i;
DateTime timeToStart = context.CurrentUtcDateTime.AddSeconds(10 * j);
Task delayedActivityCall = context
.CreateTimer(timeToStart, CancellationToken.None)
.ContinueWith(t => context.CallActivityAsync("PerformSubtask", j));
tasks.Add(delayedActivityCall);
}

await Task.WhenAll(tasks);
}

无论如何,这里是事件函数代码。

[FunctionName("PerformSubtask")]
public static void Activity([ActivityTrigger] int j, TraceWriter log)
{
log.Warning($"{DateTime.Now:o}: {j:00}");
}

从日志输出中,我看到所有事件调用彼此相隔 10 秒。

另一种方法是扇出到多个子编排(如 @jeffhollan 建议的),这些子编排是简单的持久计时器延迟和事件调用的短序列。

更新我尝试使用您更新的示例并能够重现您的问题!如果您在 Visual Studio 中本地运行并将异常设置配置为始终在发生异常时中断,那么您应该看到以下内容:

System.InvalidOperationException: 'Multithreaded execution was detected. This can happen if the orchestrator function code awaits on a task that was not created by a DurableOrchestrationContext method. More details can be found in this article https://learn.microsoft.com/en-us/azure/azure-functions/durable-functions-checkpointing-and-replay#orchestrator-code-constraints.'

这意味着调用 context.CallActivityAsync("PerformSubtask", j) 的线程与调用 Orchestrator 函数的线程不同。我不知道为什么我最初的例子没有达到这个目的,或者为什么你的版本没有达到这个目的。它与 TPL 如何决定使用哪个线程来运行您的 ContinueWith 委托(delegate)有关 - 我需要对此进行更多研究。

好消息是有一个简单的解决方法,即指定 TaskContinuationOptions.ExecuteSynchronously ,像这样:

Task delayedActivityCall = context
.CreateTimer(timeToStart, CancellationToken.None)
.ContinueWith(
t => context.CallActivityAsync("PerformSubtask", j),
TaskContinuationOptions.ExecuteSynchronously);

请尝试一下,并告诉我这是否可以解决您所观察到的问题。

理想情况下,使用 Task.ContinueWith 时不需要执行此解决方法。我在 GitHub 中打开了一个问题来跟踪此问题:https://github.com/Azure/azure-functions-durable-extension/issues/317

Since I do not want to collect the results of the activities I avoid calling await Tasks.WhenAll(tasks) after scheduling all activities. I do this in order to reduce the contention on the control queue i.e. be able to start another orchestration if reqd. Nevertheless the status of the 'orchestrator' is still 'Running' till all the activities finish running. I guess it moves to 'Complete' only after the last activity posts a 'done' message to the control queue.

这是预期的。在所有未完成的持久任务完成之前,Orchestrator 功能永远不会真正完成。没有任何办法可以解决这个问题。请注意,您仍然可以启动其他 Orchestrator 实例,只是如果它们碰巧落在同一分区上(默认情况下有 4 个分区),可能会出现一些争用。

关于c# - 仅在持久函数中扇出(并忘记),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50392874/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com