gpt4 book ai didi

C# BlockingCollection 生产者消费者不阻塞消费者线程

转载 作者:行者123 更新时间:2023-11-30 14:10:17 27 4
gpt4 key购买 nike

我有这样一种情况,我需要有大量(数百个)队列,其中的项目应该按顺序处理(需要单线程消费者)。我的第一个实现,based on the samples,我为每个 BlockingCollection 使用了一个长时间运行的任务来使用队列项。然而,我最终得到了一个包含数百个线程的应用程序,其中大部分线程处于空闲状态,除了消耗内存外什么都不做,因为队列大部分时间都是空的。

我认为仅当队列中有要处理的内容时才运行消费者任务会更好,但是,我一直无法找到提供最佳实践的示例。

我想出了一个类似于下面的解决方案。但问题是,每个项目都会产生一个新任务(也许这是低效的?浪费资源?)。但是,如果我不为每个项目创建一个新任务,我就不能保证一个项目不会在未处理的情况下坐在队列中。

    private object _processSyncObj = new object();
private volatile bool _isProcessing;
private BlockingCollection<string> _queue = new BlockingCollection<string>();

private void EnqueueItem(string item)
{
_queue.Add(item);
Task.Factory.StartNew(ProcessQueue);
}

private void ProcessQueue()
{
if (_isProcessing)
return;

lock (_processSyncObj)
{
string item;
while (_isProcessing = _queue.TryTake(out item))
{
// process item
}
}
}

对于这种情况,有什么最佳实践/最佳解决方案可以保证不存在项目在队列中但没有消费者正在运行的情况?

最佳答案

我认为您所做的是合理的,因为 Task 也可以很好地扩展到数百万个任务,针对 ThreadPool 生成内部子队列,避免过多的上下文切换。

Behind the scenes, tasks are queued to the ThreadPool, which has been enhanced with algorithms that determine and adjust to the number of threads and that provide load balancing to maximize throughput. This makes tasks relatively lightweight, and you can create many of them to enable fine-grained parallelism.

Task Parallelism (Task Parallel Library)

...但是你所做的,将以正常的任务编程结束,因为对于每个入队你都会启动一个任务,所以阻塞集合是完全未使用的。据了解,您关心的是触发任务并让 TaskScheduler 在作业到达时按顺序运行作业。

您知道您还可以自定义 TaskScheduler 吗?

只使用一个任务编程模式,加上一个自定义的 TaskScheduler 来控制计划任务的流程怎么样?

例如,您可以创建一个 OrderedTaskScheduler,它派生自 LimitedConcurrencyLevelTask​​Scheduler,其行为如下...

The LimitedConcurrencyLevelTaskScheduler class offers a task scheduler that ensures a maximum concurrency level while running on top of the ThreadPool. It is necessary to set the maximum degree of parallelism desired for this scheduler.

The OrderedTaskScheduler class provides a task scheduler that ensures only one task is executing at a time. Tasks execute in the order that they were queued (FIFO). It is a subclass of LimitedConcurrencyLevelTaskScheduler that sends 1 as a parameter for its base class constructor.

你可以发现这些调度器已经开发好了,它们叫做ParallelExtensionsExtras ,您可以从here下载。 ,并从中阅读一些关于它的知识 blog postothers .

您也可以直接在nuget 上找到它和 github 上的代码镜像.

尽情享受吧! :)

关于C# BlockingCollection 生产者消费者不阻塞消费者线程,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/24827712/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com