gpt4 book ai didi

c# - 替代具有保证交付的 Dataflow BroadcastBlock

转载 作者:太空狗 更新时间:2023-10-29 21:39:12 26 4
gpt4 key购买 nike

我需要某种类似于 BroadcastBlock 的对象,但可以保证传送。所以我使用了 this question 的答案.但我并不是很清楚这里的执行流程。我有一个控制台应用程序。这是我的代码:

static void Main(string[] args)
{
ExecutionDataflowBlockOptions execopt = new ExecutionDataflowBlockOptions { BoundedCapacity = 5 };
List<ActionBlock<int>> blocks = new List<ActionBlock<int>>();

for (int i = 0; i <= 10; i++)
blocks.Add(new ActionBlock<int>(num =>
{
int coef = i;
Console.WriteLine(Thread.CurrentThread.ManagedThreadId + ". " + num * coef);
}, execopt));

ActionBlock<int> broadcaster = new ActionBlock<int>(async num =>
{
foreach (ActionBlock<int> block in blocks) await block.SendAsync(num);
}, execopt);

broadcaster.Completion.ContinueWith(task =>
{
foreach (ActionBlock<int> block in blocks) block.Complete();
});

Task producer = Produce(broadcaster);
List<Task> ToWait = new List<Task>();
foreach (ActionBlock<int> block in blocks) ToWait.Add(block.Completion);
ToWait.Add(producer);

Task.WaitAll(ToWait.ToArray());

Console.ReadLine();
}

static async Task Produce(ActionBlock<int> broadcaster)
{
for (int i = 0; i <= 15; i++) await broadcaster.SendAsync(i);

broadcaster.Complete();
}

每个数字都必须按顺序处理,所以我不能在 broadcaster block 中使用 MaxDegreeOfParallelism。但是所有接收到数字的 Action block 都可以并行运行。

问题来了:

在输出中我可以看到不同的线程 ID。我理解正确吗,它的工作原理如下:

执行命中广播中的 await block.SendAsync(num);。如果当前 block 尚未准备好接受该号码,则执行退出广播程序并卡在 Task.WaitAll 处。当 block 接受数字时,broadcaster 中的 foreach 语句的其余部分在线程池中执行。直到最后都一样。foreach 的每次迭代都在线程池中执行。但实际上它是顺序发生的。

我的理解是对还是错?我如何更改此代码以将数字异步发送到所有 block ?

为了确保如果其中一个区 block 目前还没有准备好接收号码,我不会等待它,所有其他准备就绪的区 block 都会收到号码。并且所有 block 都可以并行运行。并保证交期。

最佳答案

假设您希望 broadcaster 一次处理一个项目,同时使目标 block 能够同时接收该项目,您需要更改 broadcaster 以提供数量同时到所有 block ,然后异步等待所有 block 一起在继续下一个数字之前接受它:

var broadcaster = new ActionBlock<int>(async num => 
{
var tasks = new List<Task>();
foreach (var block in blocks)
{
tasks.Add(block.SendAsync(num));
}
await Task.WhenAll(tasks);
}, execopt);

现在,在等待之后没有工作的情况下,您可以稍微优化一下,同时仍然返回一个等待任务:

ActionBlock<int> broadcaster = new ActionBlock<int>(
num => Task.WhenAll(blocks.Select(block => block.SendAsync(num))), execopt);

关于c# - 替代具有保证交付的 Dataflow BroadcastBlock,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25082064/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com