gpt4 book ai didi

c# - 使用 4.0 框架类和阻塞集合的 C# 中的生产者/混合消费者

转载 作者:行者123 更新时间:2023-11-30 12:25:55 28 4
gpt4 key购买 nike

我有一个生产者/消费者场景。生产者从不停止,这意味着即使有一段时间 BC 中没有项目,也可以稍后添加更多项目。

从 .NET Framework 3.5 迁移到 4.0,我决定使用 BlockingCollection 作为消费者和生产者之间的并发队列。我什至添加了一些并行扩展,这样我就可以将 BC 与 Parallel.ForEach 一起使用。

问题是,在消费者线程中,我需要有一种混合模型:

  1. 我总是检查 BC 以处理任何到达的项目Parallel.ForEach(bc.GetConsumingEnumerable(), item => 等
  2. 在这个 foreach 中,我执行了所有相互不依赖的任务。
  3. 问题来了。在将前面的任务并行化之后,我需要按照它们在 BC 中的相同 FIFO 顺序来管理它们的结果。这些结果的处理应该在同步线程中进行。

伪代码的一个小例子如下:

制作人:

//This event is triggered each time a page is scanned. Any batch of new pages can be added at any time at the scanner
private void Current_OnPageScanned(object sender, ScannedPage scannedPage)
{
//The object to add has a property with the sequence number
_concurrentCollection.TryAdd(scannedPage);
}

消费者:

private void Init()
{
_cancelTasks = false;
_checkTask = Task.Factory.StartNew(() =>
{
while (!_cancelTasks)
{
//BlockingCollections with Parallel ForEach
var bc = _concurrentCollection;
Parallel.ForEach(bc.GetConsumingEnumerable(), item =>
{
ScannedPage currentPage = item;
// process a batch of images from the bc and check if an image has a valid barcode. T
});
//Here should go the code that takes the results from each tasks, process them in the same FIFO order in which they entered the BC and save each image to a file, all of this in this same thread.

}
});
}

显然,这不能正常工作,因为 .GetConsumingEnumerable() 会阻塞,直到 BC 中有另一个项目。我假设我可以通过任务来完成,并且在同一批处理中触发 4 或 5 个任务,但是:

  1. 我怎么能对任务执行此操作,并且在任务开始之前仍然有一个等待点,该等待点会阻塞直到 BC 中有要消耗的项目(如果什么都没有,我不想开始处理。曾经在 BC 中有一些东西我会开始这批 4 个任务,并在每个任务中使用 TryTake 这样如果没有什么可拿的他们就不会阻塞,因为我不知道是否我总能达到 BC 中的项目数作为任务批处理,例如,BC 中只剩下一个项目和一批 4 个任务)?
  2. 我如何做到这一点并利用 Parallel.For 提供的效率?
  3. 如何按照从 BC 中提取项目的相同 FIFO 顺序保存任务的结果?
  4. 是否有任何其他并发类更适合消费者中的这种项目混合处理?
  5. 此外,这是我在 StackOverflow 中提出的第一个问题,所以如果您需要更多数据或者您只是认为我的问题不正确,请告诉我。

最佳答案

我想我按照你的要求做了,为什么不创建一个 ConcurrentBag 并在像这样处理时添加到它:

while (!_cancelTasks)
{
//BlockingCollections with Paralell ForEach
var bc = _concurrentCollection;
var q = new ConcurrentBag<ScannedPage>();
Parallel.ForEach(bc.GetConsumingEnumerable(), item =>
{
ScannedPage currentPage = item;
q.Add(item);
// process a batch of images from the bc and check if an image has a valid barcode. T
});
//Here should go the code that takes the results from each tasks, process them in the same FIFO order in which they entered the BC and save each image to a file, all of this in this same thread.


//process items in your list here by sorting using some sequence key
var items = q.OrderBy( o=> o.SeqNbr).ToList();
foreach( var item in items){
...
}
}

这显然不会按照它们添加到 BC 中的确切顺序将它们排入队列,但您可以像 Alex 建议的那样向 ScannedPage 对象添加一些序列 nbr,然后对结果进行排序。

这是我处理序列的方式:

将此添加到 ScannedPage 类:

public static int _counter;  //public because this is just an example but it would work.

获取序列nbr并在此处赋值:

private void Current_OnPageScanned(object sender, ScannedPage scannedPage)
{
lock( this){ //to single thread this process.. not necessary if it's already single threaded of course.
System.Threading.Interlocked.Increment( ref ScannedPage._counter);
scannedPage.SeqNbr = ScannedPage._counter;
...
}
}

关于c# - 使用 4.0 框架类和阻塞集合的 C# 中的生产者/混合消费者,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30359040/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com