gpt4 book ai didi

c# - TPL 数据流向所有消费者重复消息

转载 作者:行者123 更新时间:2023-11-30 15:13:25 25 4
gpt4 key购买 nike

我目前正在使用 WPF 和 TPL 数据流编写应用程序,它应该执行以下操作:

  1. 加载目录中的所有文件
  2. 一旦开始处理,将一些内容记录到用户界面并处理每个文件
  3. 完成后向用户界面记录一些内容

问题是 UI 的日志记录需要发生在 UI 线程中,并且只在它开始处理之前记录。

我现在能够做到这一点的唯一方法是从 TPL 转换 block 内部手动调用调度程序并更新 UI:

Application.Current.Dispatcher.Invoke(new Action(() =>
{
ProcessedFiles.Add(optimizedFileResult);
}));

我想通过在 UI 线程上运行的 DataFlow block 来执行此操作:

ExecutionDataflowBlockOptions.TaskScheduler = TaskScheduler.FromCurrentSynchronizationContext();

但是,如果我在优化发生的 block 上设置它,优化也将运行单线程。

另一方面,如果我在 Processing block 之前创建一个新 block 并在那里调用它。它会在实际开始之前开始说“正在处理”。

示例代码

我创建了一些示例代码来重现此问题:

public class TplLoggingToUiIssue
{
public TplLoggingToUiIssue()
{

}

public IEnumerable<string> RecurseFiles()
{
for (int i = 0; i < 20; i++)
{
yield return i.ToString();
}
}

public async Task Go()
{
var block1 = new TransformBlock<string, string>(input =>
{
Console.WriteLine($"1: {input}");
return input;
}, new ExecutionDataflowBlockOptions()
{
MaxDegreeOfParallelism = 4,
BoundedCapacity = 10,
EnsureOrdered = false
});

var block2 = new TransformBlock<string, string>(input =>
{
Console.WriteLine($"2: {input}\t\t\tStarting {input} now (ui logging)");
return input;
}, new ExecutionDataflowBlockOptions()
{
//TaskScheduler = TaskScheduler.FromCurrentSynchronizationContext(), (Doesn't work in Console app, but you get the idea)
MaxDegreeOfParallelism = 1,
BoundedCapacity = 1,
EnsureOrdered = false
});


var block3 = new TransformBlock<string, string>(async input =>
{
Console.WriteLine($"3 start: {input}");
await Task.Delay(5000);
Console.WriteLine($"3 end: {input}");
return input;
}, new ExecutionDataflowBlockOptions()
{
MaxDegreeOfParallelism = 2,
BoundedCapacity = 10,
EnsureOrdered = false
});

var block4 = new ActionBlock<string>(input =>
{
Console.WriteLine($"4: {input}");
}, new ExecutionDataflowBlockOptions()
{
MaxDegreeOfParallelism = 1,
BoundedCapacity = 1,
EnsureOrdered = false
});


block1.LinkTo(block2, new DataflowLinkOptions() { PropagateCompletion = true });
block2.LinkTo(block3, new DataflowLinkOptions() { PropagateCompletion = true });
block3.LinkTo(block4, new DataflowLinkOptions() { PropagateCompletion = true });


var files = RecurseFiles();
await Task.Run(async () =>
{
foreach (var file in files)
{
Console.WriteLine($"Posting: {file}");
var result = await block1.SendAsync(file);

if (!result)
{
Console.WriteLine("Result is false!!!");
}
}
});

Console.WriteLine("Completing");
block1.Complete();
await block4.Completion;
Console.WriteLine("Done");
}
}

如果您运行此示例(只有 6 个"file"),您将获得以下输出:

Posting: 0
Posting: 1
Posting: 2
Posting: 3
Posting: 4
Posting: 5
1: 2
1: 1
1: 3
1: 0
1: 4
1: 5
2: 2 Starting 2 now (ui logging)
Completing
3 start: 2
2: 0 Starting 0 now (ui logging)
3 start: 0
2: 3 Starting 3 now (ui logging)
2: 1 Starting 1 now (ui logging)
2: 4 Starting 4 now (ui logging)
2: 5 Starting 5 now (ui logging)
3 end: 2
3 end: 0
3 start: 3
3 start: 1
4: 2
4: 0
3 end: 3
3 end: 1
4: 3
3 start: 4
3 start: 5
4: 1
3 end: 5
3 end: 4
4: 5
4: 4
Done

从这个输出中可以看出,它的记录开始得太早了。我也试过使用 Broadcast block ,但这会覆盖值,因此它们会丢失。

理想的情况是以某种方式让日志记录 block 等到处理 block 有容量,然后推送一项。

最佳答案

如其他答案所示,有几种方法可以解决这个问题。我想指出一个替代方案:使用 Progress<T>为了那个原因。尽管它被设计为最适合与 Tasks 一起使用,但它也适用于 Dataflow,如下所示:

        private void Form1_Load(object sender, EventArgs e)
{
var progressReporter = new Progress<string>();
progressReporter.ProgressChanged += (reporter, message) => label1.Text = message;

var b1 = new ActionBlock<string>((input) =>
{
((IProgress<string>)progressReporter).Report(input);
}, new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 10
});

b1.Post("a");
b1.Post("b");
b1.Post("c");
b1.Post("d");
}

总的来说,这看起来是一个干净的替代方案,无需为各个 block 添加一些管道。

更多信息可以在这个优秀的 blogpost 中找到

关于c# - TPL 数据流向所有消费者重复消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58978894/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com