gpt4 book ai didi

c# - 具有批量生产者的生产者/消费者模式

转载 作者:行者123 更新时间:2023-12-05 00:25:24 27 4
gpt4 key购买 nike

我正在尝试使用多个生产者和一个消费者来实现一个相当简单的生产者/消费者风格的应用程序。

研究使我进入了 BlockingCollection<T>这很有用,并允许我实现一个长期运行的消费者任务,如下所示:

var c1 = Task.Factory.StartNew(() =>
{
var buffer = new List<int>(BATCH_BUFFER_SIZE);

foreach (var value in blockingCollection.GetConsumingEnumerable())
{
buffer.Add(value);
if (buffer.Count == BATCH_BUFFER_SIZE)
{
ProcessItems(buffer);
buffer.Clear();
}
}
});
ProcessItems函数将缓冲区提交到数据库,并且它可以批量工作。然而,这个解决方案是次优的。在男爵生产期间,可能需要一段时间才能填充缓冲区,这意味着数据库已过时。

更理想的解决方案是在 30 秒计时器上运行任务或将 foreach 短路。超时。

我按照计时器的想法运行并想出了这个:
syncTimer = new Timer(new TimerCallback(TimerElapsed), blockingCollection, 5000, 5000);

private static void TimerElapsed(object state)
{
var buffer = new List<int>();
var collection = ((BlockingCollection<int>)state).GetConsumingEnumerable();

foreach (var value in collection)
{
buffer.Add(value);
}

ProcessItems(buffer);
buffer.Clear();
}

这有明显的问题, foreach将被阻塞直到结束,从而破坏了计时器的目的。

任何人都可以提供一个方向吗?我基本上需要对 BlockingCollection 进行快照定期和处理内容清除它。也许是 BlockingCollection是错误的类型吗?

最佳答案

而不是使用 GetConsumingEnumerable在计时器回调中,改用这些方法之一,将结果添加到列表中,直到它返回 false或者您已经达到了令人满意的批量大小。

BlockingCollection.TryTake Method (T) - 可能是您需要的,您根本不想执行进一步的等待。

BlockingCollection.TryTake Method (T, Int32)

BlockingCollection.TryTake Method (T, TimeSpan)

您可以轻松地将其提取到扩展中(未经测试):

public static IList<T> Flush<T>
(this BlockingCollection<T> collection, int maxSize = int.MaxValue)
{
// Argument checking.

T next;
var result = new List<T>();

while(result.Count < maxSize && collection.TryTake(out next))
{
result.Add(next);
}

return result;
}

关于c# - 具有批量生产者的生产者/消费者模式,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/24447985/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com