gpt4 book ai didi

c# - 尝试在长时间运行的生成器上使用 PLINQ 的陷阱?

转载 作者:太空狗 更新时间:2023-10-29 21:46:27 24 4
gpt4 key购买 nike

我有几个无限生成器方法,包括一些长时间运行和无限长时间运行的生成器。

IEnumerable<T> ExampleOne() { 
while(true) // this one blocks for a few seconds at a time
yield return LongRunningFunction();
}
IEnumerable<T> ExampleTwo() {
while(true) //this one blocks for a really long time
yield return OtherLongRunningFunction();
}

我的目标是拥有一个组合两个示例中的项目的无限序列。这是我使用 PLINQ 尝试过的:

 IEnumerable<T> combined =  new[] { ExampleOne(), ExampleTwo() }
.AsParallel()
.WithMergeOptions(ParallelMergeOptions.NotBuffered)
.WithExecutionMode(ParallelExecutionMode.ForceParallelism)
.SelectMany(source => source.GetRequests());

这似乎将两个 IEnumerables 适本地组合成一个新的,来自 IEnumerable #1 和 #2 的项目只要出现在两个源 IEnumerables 中的任何一个中就可用:

//assuming ExampleTwo yields TWO but happens roughly 5 times 
//less often then ExampleOne
Example output: one one one one one TWO one one one one one one TWO

但是,似乎有时(通常在运行许多小时之后)OtherLongRunningFunction() 会在很长一段时间内不返回,并且在以下条件下难以重现,combined 序列将阻塞它,而不是继续从第一个 LongRunningFunction 返回结果。看起来虽然组合并行查询开始使用两个线程,但后来它决定切换到一个线程。

我的第一个想法是“这可能是 RX Observable.Merge 而不是 PLINQ 的工作。”但是我很感激这两个答案,这些答案显示了处理这种情况的正确替代方法,以及关于 PLINQ 如何在查询开始后数小时内更改并行度的机制的解释。

最佳答案

这是 Rx 的方法,事实上,它确实使用了 Merge:

IObservable<T> LongRunningFunction()
{
return Observable.Start(() => {
// Calculate some stuff
return blah;
}, Scheduler.TaskPoolScheduler);
}

Observable.Merge(
Observable.Defer(LongRunningFunction).Repeat(),
Observable.Defer(OtherLongRunningFunction).Repeat(),
).Subscribe(x => {
Console.WriteLine("An item: {0}", x);
});

关于c# - 尝试在长时间运行的生成器上使用 PLINQ 的陷阱?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/8998799/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com