gpt4 book ai didi

c# - 如何在从中删除项目之前在管道 block 中执行任务直到集合已满

转载 作者:太空宇宙 更新时间:2023-11-03 23:31:35 27 4
gpt4 key购买 nike

我有 3 个任务组成一个管道。我希望终结器任务(管道中的最后一个)仅在达到其边界容量后才开始使用来自 buffer2 的数据。现在,它会在每件商品到达时挑选它们,而我不希望这样。

 static void Main(string[] args)
{
List<int> input = Enumerable.Range(0, 20).ToList();

BlockingCollection<int> buffer1 = new BlockingCollection<int>(10);
BlockingCollection<int> buffer2 = new BlockingCollection<int>(5);

Task producer = Task.Factory.StartNew(() => Producer(input, buffer1));
Task consumer = Task.Factory.StartNew(() => Consumer(buffer1, buffer2));
Task finalizer = Task.Factory.StartNew(() => Finalizer(buffer2));

Task.WaitAll(producer,consumer, finalizer);
Console.ReadLine();
}

private static void Producer(List<int> input, BlockingCollection<int> buffer1)
{
foreach (int i in input)
{
buffer1.Add(i);
}

buffer1.CompleteAdding();
}

private static void Consumer(BlockingCollection<int> buffer1, BlockingCollection<int> buffer2)
{
foreach(int i in buffer1.GetConsumingEnumerable())
{
Console.WriteLine("Consumer saw item " + i);

buffer2.Add(i);
}

buffer2.CompleteAdding();
}

private static void Finalizer(BlockingCollection<int> buffer)
{
foreach (int i in buffer.GetConsumingEnumerable())
{
// Do some work
Console.WriteLine("Finalizer saw item " + i);
System.Threading.Thread.Sleep(1000);
}

buffer.CompleteAdding();
}

如何让终结器在 buffer2 中有 5 个项目之前停止消耗它中的项目?消费者也一样,只有当缓冲区 1 至少有 10 个项目时,它才应该开始消费。

最佳答案

也许你应该看看TPL Dataflow from Microsoft

我已经对您的代码进行了一些更改以适应 TPL

static void Main(string[] args)
{
List<int> input = Enumerable.Range(0, 20).ToList();

BatchBlock<int> buffer1 = new BatchBlock<int>(10);
BatchBlock<int> buffer2 = new BatchBlock<int>(5);
ActionBlock<int[]> action1;
ActionBlock<int[]> action2;

action1 = new ActionBlock<int[]>(t => { Consumer(t, buffer2); },
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 });

buffer1.LinkTo(action1, new DataflowLinkOptions { PropagateCompletion = true });

action2 = new ActionBlock<int[]>(t => Finalizer(t),
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 });

buffer2.LinkTo(action2, new DataflowLinkOptions { PropagateCompletion = true });

Task produceTask = Task.Factory.StartNew(() => Producer(input, buffer1));

Task.WaitAll(produceTask);
action1.Completion.Wait();//Will add all the items to buffer2
buffer2.Complete();//will not get any new items
action2.Completion.Wait();//Process the batch of 5 and then complete
Console.WriteLine("Process complete");
Console.ReadLine();
}

private static void Finalizer(int[] t)
{
Console.WriteLine("Received a batch of items {0}", t.Count());
foreach (int i in t)
{
// Do some work
Console.WriteLine("Finalizer saw item " + i);
System.Threading.Thread.Sleep(1000);
}
}

private static void Consumer(int[] t, BatchBlock<int> buffer2)
{
foreach (var item in t)
{
Console.WriteLine("Consumer saw item " + item);
buffer2.Post(item);
}

}

public static void Producer(List<int> input, BatchBlock<int> buffer1)
{
foreach (int i in input)
{
buffer1.Post(i);
}

//Once marked complete your entire data flow will signal a stop for
// all new items
buffer1.Complete();

}

Here是我用的nuget包

编辑 更新了上面的代码以在消费时取最小计数 10。至于你怀疑当我们有偏斜分布时它是否符合要求那么答案是

关于c# - 如何在从中删除项目之前在管道 block 中执行任务直到集合已满,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32020459/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com