gpt4 book ai didi

c# - TPL 数据流工作流

转载 作者:行者123 更新时间:2023-11-30 17:42:19 27 4
gpt4 key购买 nike

我刚开始阅读 TPL 数据流,它真的让我感到困惑。我读了很多关于这个主题的文章,但我无法轻易消化。可能很难,也可能是我还没有开始理解这个想法。

我开始研究这个的原因是我想实现一个可以运行并行任务但按顺序运行的场景,我发现 TPL 数据流可以用作此场景。

我正在练习 TPL 和 TPL Dataflow,并且处于非常初级的水平,所以我需要专家的帮助,他们可以指导我走向正确的方向。在我编写的测试方法中,我做了以下事情,

private void btnTPLDataFlow_Click(object sender, EventArgs e)
{
Stopwatch watch = new Stopwatch();
watch.Start();

txtOutput.Clear();

ExecutionDataflowBlockOptions execOptions = new ExecutionDataflowBlockOptions();
execOptions.MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded;

ActionBlock<string> actionBlock = new ActionBlock<string>(async v =>
{
await Task.Delay(200);
await Task.Factory.StartNew(

() => txtOutput.Text += v + Environment.NewLine,
CancellationToken.None,
TaskCreationOptions.None,
scheduler
);

}, execOptions);

for (int i = 1; i < 101; i++)
{
actionBlock.Post(i.ToString());
}

actionBlock.Complete();

watch.Stop();
lblTPLDataFlow.Text = Convert.ToString(watch.ElapsedMilliseconds / 1000);
}

现在该过程是并行的并且都是异步的(不卡住我的 UI)但是生成的输出不按顺序,而我已经读到 TPL 数据流默认保持元素的顺序。所以我的猜测是,我创建的任务是罪魁祸首,它没有以正确的顺序输出字符串。我说得对吗?

如果是这种情况,那么我该如何使这个异步并使两者都有序?

我曾尝试分离代码并尝试将代码分发到不同的方法中,但我的尝试失败了,因为只有字符串输出到文本框,没有其他任何事情发生。

 private async void btnTPLDataFlow_Click(object sender, EventArgs e)
{
Stopwatch watch = new Stopwatch();
watch.Start();

await TPLDataFlowOperation();

watch.Stop();
lblTPLDataFlow.Text = Convert.ToString(watch.ElapsedMilliseconds / 1000);
}

public async Task TPLDataFlowOperation()
{
var actionBlock = new ActionBlock<int>(async values => txtOutput.Text += await ProcessValues(values) + Environment.NewLine,
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded, TaskScheduler = scheduler });

for (int i = 1; i < 101; i++)
{
actionBlock.Post(i);
}

actionBlock.Complete();
await actionBlock.Completion;
}

private async Task<string> ProcessValues(int i)
{
await Task.Delay(200);
return "Test " + i;
}

我知道我写了一段糟糕的代码,但这是我第一次尝试使用 TPL 数据流。

最佳答案

How do I make this Asynchronous and in order?

这有点矛盾。您可以让并发任务按顺序开始,但您不能真正保证它们会按顺序运行或完成。

让我们检查一下您的代码,看看发生了什么。

首先,您选择了 DataflowBlockOptions.Unbounded。这告诉 TPL Dataflow 它不应该限制它允许并发运行的任务数。因此,您的每项任务都将按顺序或多或少同时开始

您的异步操作以 await Task.Delay(200) 开始。这将导致您的方法暂停,然后在大约 200 毫秒后恢复。但是,此延迟准确,并且会因一次调用而异。此外,您的代码在延迟后恢复的​​机制可能需要不同的时间。由于实际延迟的这种随机变化,下一段要运行的代码现在顺序——导致您看到的差异。

您可能会发现这个示例很有趣。这是一个控制台应用程序,可以稍微简化一些事情。

class Program
{
static void Main(string[] args)
{
OutputNumbersWithDataflow();
OutputNumbersWithParallelLinq();

Console.ReadLine();
}

private static async Task HandleStringAsync(string s)
{
await Task.Delay(200);
Console.WriteLine("Handled {0}.", s);
}

private static void OutputNumbersWithDataflow()
{
var block = new ActionBlock<string>(
HandleStringAsync,
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });

for (int i = 0; i < 20; i++)
{
block.Post(i.ToString());
}

block.Complete();

block.Completion.Wait();
}

private static string HandleString(string s)
{
// Perform some computation on s...
Thread.Sleep(200);

return s;
}

private static void OutputNumbersWithParallelLinq()
{
var myNumbers = Enumerable.Range(0, 20).AsParallel()
.AsOrdered()
.WithExecutionMode(ParallelExecutionMode.ForceParallelism)
.WithMergeOptions(ParallelMergeOptions.NotBuffered);

var processed = from i in myNumbers
select HandleString(i.ToString());

foreach (var s in processed)
{
Console.WriteLine(s);
}
}
}

第一组数字是使用与您的方法非常相似的方法计算的——使用 TPL Dataflow。数字乱序。

第二组数字,由 OutputNumbersWithParallelLinq() 输出,根本不使用 Dataflow。它依赖于 .NET 中内置的 Parallel LINQ 功能。这会在后台线程上运行我的 HandleString() 方法,但保持数据的顺序直到最后

这里的限制是 PLINQ 不允许您提供异步方法。 (好吧,你可以,但它不会给你想要的行为。) HandleString() 是一个传统的同步方法;它只是在后台线程上执行。

下面是一个更复杂的数据流示例,它确实保留了正确的顺序:

private static void OutputNumbersWithDataflowTransformBlock()
{
Random r = new Random();
var transformBlock = new TransformBlock<string, string>(
async s =>
{
// Make the delay extra random, just to be sure.
await Task.Delay(160 + r.Next(80));
return s;
},
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });

// For a GUI application you should also set the
// scheduler here to make sure the output happens
// on the correct thread.
var outputBlock = new ActionBlock<string>(
s => Console.WriteLine("Handled {0}.", s),
new ExecutionDataflowBlockOptions
{
SingleProducerConstrained = true,
MaxDegreeOfParallelism = 1
});

transformBlock.LinkTo(outputBlock, new DataflowLinkOptions { PropagateCompletion = true });

for (int i = 0; i < 20; i++)
{
transformBlock.Post(i.ToString());
}

transformBlock.Complete();

outputBlock.Completion.Wait();
}

关于c# - TPL 数据流工作流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32071401/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com