gpt4 book ai didi

c# - 如何批量消费 BlockingCollection

转载 作者:可可西里 更新时间:2023-11-01 09:15:48 25 4
gpt4 key购买 nike

我想出了一些代码来消耗队列中所有等待的项目。与其一个接一个地处理项目,不如将所有等待的项目作为一个集合来处理。

我已经这样声明了我的队列。

private BlockingCollection<Item> items = 
new BlockingCollection<Item>(new ConcurrentQueue<Item>);

然后,在消费者线程上,我计划像这样批量读取项目,

Item nextItem;
while (this.items.TryTake(out nextItem, -1))
{
var workToDo = new List<Item>();
workToDo.Add(nextItem);

while(this.items.TryTake(out nextItem))
{
workToDo.Add(nextItem);
}

// process workToDo, then go back to the queue.
}

这种方法缺乏 GetConsumingEnumerable 的效用我不禁想知道我是否错过了更好的方法,或者我的方法是否有缺陷。

有没有更好的方法来消费 BlockingCollection<T>分批?

最佳答案

一个解决方案是使用 BufferBlock<T> System.Threading.Tasks.Dataflow (包含在 .net core 3+ 中)。它不使用 GetConsumingEnumerable() ,但它仍然允许您使用相同的实用程序,主要是:

  • 允许与多个(对称和/或非对称)消费者和生产者并行处理
  • 线程安全(允许上述情况)——无需担心竞争条件
  • 可以通过取消 token 和/或收集完成来取消
  • 消费者阻塞直到数据可用,避免在轮询上浪费 CPU 周期

还有一个 BatchBlock<T> ,但这限制了您只能使用固定大小的批处理。

var buffer = new BufferBlock<Item>();
while (await buffer.OutputAvailableAsync())
{
if (buffer.TryReceiveAll(out var items))
//process items
}

这是一个工作示例,演示了以下内容:

  • 并行处理可变长度批处理的多个对称消费者
  • 多个对称的生产者(在此示例中并非真正并行运行)
  • 能够在生产者完成后完成收集
  • 为使示例简短,我没有演示 CancellationToken 的用法。
  • 能够等到生产者和/或消费者完成
  • 能够从不允许异步的区域调用,例如构造函数
  • Thread.Sleep()调用不是必需的,但有助于模拟在更繁重的场景中会发生的一些处理时间
  • Task.WaitAll()Thread.Sleep()可以选择转换为它们的异步等价物
  • 无需使用任何外部库
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

static class Program
{
static void Main()
{
var buffer = new BufferBlock<string>();

// Kick off consumer task(s)
List<Task> consumers = new List<Task>();
for (int i = 0; i < 3; i++)
{
consumers.Add(Task.Factory.StartNew(async () =>
{
// need to copy this due to lambda variable capture
var num = i;
while (await buffer.OutputAvailableAsync())
{
if (buffer.TryReceiveAll(out var items))
Console.WriteLine($"Consumer {num}: " +
items.Aggregate((a, b) => a + ", " + b));

// real life processing would take some time
await Task.Delay(500);
}

Console.WriteLine($"Consumer {num} complete");
}));

// give consumer tasks time to activate for a better demo
Thread.Sleep(100);
}

// Kick off producer task(s)
List<Task> producers = new List<Task>();
for (int i = 0; i < 3; i++)
{
producers.Add(Task.Factory.StartNew(() =>
{
for (int j = 0 + (1000 * i); j < 500 + (1000 * i); j++)
buffer.Post(j.ToString());
}));

// space out the producers for a better demo
Thread.Sleep(10);
}

// may also use the async equivalent
Task.WaitAll(producers.ToArray());
Console.WriteLine("Finished waiting on producers");

// demo being able to complete the collection
buffer.Complete();

// may also use the async equivalent
Task.WaitAll(consumers.ToArray());
Console.WriteLine("Finished waiting on consumers");

Console.ReadLine();
}
}

这是代码的简化版本。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

class Program
{
private static async Task Main()
{
var buffer = new BufferBlock<string>();

// Kick off consumer task(s)
var consumers = new List<Task>();
for (var i = 0; i < 3; i++)
{
var id = i;
consumers.Add(Task.Run(() => StartConsumer(id, buffer)));

// give consumer tasks time to activate for a better demo
await Task.Delay(100);
}

// Kick off producer task(s)
var producers = new List<Task>();
for (var i = 0; i < 3; i++)
{
var pid = i;
producers.Add(Task.Run(() => StartProducer(pid, buffer)));

// space out the producers for a better demo
await Task.Delay(10);
}

// may also use the async equivalent
await Task.WhenAll(producers);
Console.WriteLine("Finished waiting on producers");

// demo being able to complete the collection
buffer.Complete();

// may also use the async equivalent
await Task.WhenAll(consumers);
Console.WriteLine("Finished waiting on consumers");

Console.ReadLine();
}

private static async Task StartConsumer(
int id,
IReceivableSourceBlock<string> buffer)
{
while (await buffer.OutputAvailableAsync())
{
if (buffer.TryReceiveAll(out var items))
{
Console.WriteLine($"Consumer {id}: " +
items.Aggregate((a, b) => a + ", " + b));
}

// real life processing would take some time
await Task.Delay(500);
}

Console.WriteLine($"Consumer {id} complete");
}

private static Task StartProducer(int pid, ITargetBlock<string> buffer)
{
for (var j = 0 + (1000 * pid); j < 500 + (1000 * pid); j++)
{
buffer.Post(j.ToString());
}

return Task.CompletedTask;
}
}

关于c# - 如何批量消费 BlockingCollection<T>,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/12260166/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com