gpt4 book ai didi

c# - 流式数据 BlockingCollection

转载 作者:行者123 更新时间:2023-11-30 12:14:38 25 4
gpt4 key购买 nike

Stephen Toub 的书第 88 页

http://www.microsoft.com/download/en/details.aspx?id=19222

有代码

private BlockingCollection<T> _streamingData = new BlockingCollection<T>();
// Parallel.ForEach
Parallel.ForEach(_streamingData.GetConsumingEnumerable(),
item => Process(item));
// PLINQ
var q = from item in _streamingData.GetConsumingEnumerable().AsParallel()
...
select item;

然后斯蒂芬提到

"when passing the result of calling GetConsumingEnumerable as the data source to Parallel.ForEach, the threads used by the loop have the potential to block when the collection becomes empty. And a blocked thread may not be released by Parallel.ForEach back to the ThreadPool for retirement or other uses. As such, with the code as shown above, if there are any periods of time where the collection is empty, the thread count in the process may steadily grow;"

我不明白为什么线程数会增加?

如果集合为空,那么 blockingcollection 不会请求任何进一步的线程吗?

因此您不需要执行 WithDegreeOfParallelism 来限制 BlockingCollection 上使用的线程数

最佳答案

线程池有一个爬山算法,用于估计合适的线程数。只要增加线程会增加吞吐量,线程池就会创建更多的线程。它将假设发生了一些阻塞或 IO,并尝试通过遍历系统中的处理器数量来使 CPU 饱和。

这就是为什么在线程池线程上执行 IO 和阻塞操作可能很危险。

这是上述行为的完整示例:

        BlockingCollection<string> _streamingData = new BlockingCollection<string>();

Task.Factory.StartNew(() =>
{
for (int i = 0; i < 100; i++)
{
_streamingData.Add(i.ToString());
Thread.Sleep(100);
}
});

new Thread(() =>
{
while (true)
{
Thread.Sleep(1000);
Console.WriteLine("Thread count: " + Process.GetCurrentProcess().Threads.Count);
}
}).Start();

Parallel.ForEach(_streamingData.GetConsumingEnumerable(), item =>
{
});

虽然吞吐量没有增加,但我不知道为什么线程数一直在攀升。根据我解释的模型,它不会增长。但我不知道我的模型是否真的正确。

也许线程池有一个额外的启发式方法,如果它看不到任何进展(以每秒完成的任务来衡量),它就会生成线程。这是有道理的,因为这可能会防止应用程序中出现很多死锁。如果重要任务因等待现有任务退出并使线程可用而无法运行,则可能会发生死锁。这是线程池的一个众所周知的问题。

关于c# - 流式数据 BlockingCollection,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/9045028/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com