gpt4 book ai didi

c# - Transform Block with parallelism and bounded capacity 延迟消息行为

转载 作者:行者123 更新时间:2023-12-04 13:13:27 26 4
gpt4 key购买 nike

TransformBlock 有一个 MaxDegreeOfParallelism > 1BoundedCapacity 不是无界的,为什么它推迟接收进一步的消息一个长时间运行的任务,尽管输入队列中有容量?

采用以下控制台应用程序。它创建了一个 MaxDegreeOfParallelism = 5BoundedCapacity = 5 的 TransformBlock,然后向它提供 100 条消息。当 block 处理消息 x == 50 时,它会将该任务延迟 10 秒。

TransformBlock<int, string> DoSomething = new TransformBlock<int, string>(async (x) => {
if (x == 50)
{
Console.WriteLine("x == 50 reached, delaying for 10 seconds.");
await Task.Delay(10000);
}
Console.WriteLine($"processed message {x}");
return x.ToString();
}, new ExecutionDataflowBlockOptions { BoundedCapacity = 5, MaxDegreeOfParallelism = 5 });

DoSomething.LinkTo(DataflowBlock.NullTarget<string>()); // ensure we empty the transform block

for (int i = 0; i < 100; i++)
{
Stopwatch blockedTime = Stopwatch.StartNew();
await DoSomething.SendAsync(i).ConfigureAwait(false);
blockedTime.Stop();
Console.WriteLine($"Submitted {i}\tBlocked for {blockedTime.ElapsedMilliseconds}ms.");
}

DoSomething.Complete();
await DoSomething.Completion;
Console.WriteLine("Completed.");
Console.ReadKey();

结果显示消息 50-54 都被该 block 接收到。消息 51-54 已完成,然后控制台窗口在 10 秒内没有显示任何输出,然后显示消息 50 已完成并且 block 能够接收到消息 55。

...
Submitted 50 Blocked for 0ms.
Submitted 51 Blocked for 0ms.
processed message 51
Submitted 52 Blocked for 0ms.
x == 50 reached, delaying for 10 seconds.
processed message 52
processed message 53
Submitted 53 Blocked for 0ms.
Submitted 54 Blocked for 0ms.
processed message 54 // when run, 10 seconds pause happens after displaying this line
processed message 50
processed message 55
Submitted 55 Blocked for 9998ms.
...

为什么 Transform Block 不继续填充 block 直到 Bounded Capacity 为 5,并使用其他 4 个并行度继续处理消息?

ActionBlock 不会显示这些症状并继续处理其他可用并行线上的消息。

无限容量 TransformBlock 也不会显示这些症状。

最佳答案

因为默认参数EnsureOrderedtrue ,所以它试图维持结果的顺序。也就是说,它不能继续处理超过 BoundedCapacity。因为需要它来维持秩序,这就是您在测试中看到的背压

另外,一个 ActionBlock不会表现出这种行为,因为它不会输出到任何其他 block (可以这么说,这是一个死胡同),因此没有排序的概念,背压 仅受有限容量并行度 的限制。

DataflowBlockOptions.EnsureOrdered Property

By default, dataflow blocks enforce ordering on the processing ofmessages. Setting EnsureOrdered to false tells a block that it may relax this ordering if it's able to do so.This can be beneficial if making a processed result immediatelyavailable is more important than maintaining the input-to-outputordering.

解决方法是删除已排序的要求

new ExecutionDataflowBlockOptions 
{
BoundedCapacity = 5,
MaxDegreeOfParallelism = 5 ,
EnsureOrdered = false
});

关于c# - Transform Block with parallelism and bounded capacity 延迟消息行为,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62669388/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com