gpt4 book ai didi

c# - 为什么在从 BufferBlock 完全处理所有异步调用之前我的数据流就完成了?

转载 作者:太空宇宙 更新时间:2023-11-03 15:35:19 24 4
gpt4 key购买 nike

我有一个数据流如下。

1. 以 block 的形式读取文本文件并将它们添加到 BatchBlock<chunkSize> 的任务

2.一个ActionBlock链接到上面的 BatchBlock将数据分成批处理并将它们添加到 BufferBlock

3.一个TransformationBlock链接到 BufferBlock ,它产生一个 async每批任务

4. 当所有 spanwed async通话结束。

以下代码未按预期工作。它在处理完所有批处理之前完成。我错过了什么?

private static void DataFlow(string filePath, int chunkSize, int batchSize)
{
int chunkCount = 0;
int batchCount = 0;

BatchBlock<string> chunkBlock = new BatchBlock<string>(chunkSize);
BufferBlock<IEnumerable<string>> batchBlock = new BufferBlock<IEnumerable<string>>();

Task produceTask = Task.Factory.StartNew(() =>
{
foreach (var line in File.ReadLines(filePath))
{
chunkBlock.Post(line);
}

Console.WriteLine("Finished producing");
chunkBlock.Complete();
});

var makeBatches = new ActionBlock<string[]>(t =>
{
Console.WriteLine("Got a chunk " + ++chunkCount);

// Partition each chunk into smaller chunks grouped on column 1
var partitions = t.GroupBy(c => c.Split(',')[0], (key, g) => g);

// Further beakdown the chunks into batch size groups
var groups = partitions.Select(x => x.Select((i, index) => new { i, index }).GroupBy(g => g.index / batchSize, e => e.i));

// Get batches from groups
var batches = groups.SelectMany(x => x).Select(y => y.Select(z => z));

foreach (var batch in batches)
{
batchBlock.Post(batch);
}

batchBlock.Complete();

}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 });

chunkBlock.LinkTo(makeBatches, new DataflowLinkOptions { PropagateCompletion = true });

var executeBatches = new TransformBlock<IEnumerable<string>, IEnumerable<string>>(async b =>
{
Console.WriteLine("Got a batch " + ++batchCount);
await ExecuteBatch(b);
return b;

}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });

batchBlock.LinkTo(executeBatches, new DataflowLinkOptions { PropagateCompletion = true });

var finishBatches = new ActionBlock<IEnumerable<string>>(b =>
{
Console.WriteLine("Finised executing batch" + batchCount);
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });

executeBatches.LinkTo(finishBatches, new DataflowLinkOptions { PropagateCompletion = true });

Task.WaitAll(produceTask);
Console.WriteLine("Production complete");

makeBatches.Completion.Wait();
Console.WriteLine("Making batches complete");

executeBatches.Completion.Wait();
Console.WriteLine("Executing batches complete");

Task.WaitAll(finishBatches.Completion);

Console.WriteLine("Process complete with total chunks " + chunkCount + " and total batches " + batchCount);
Console.ReadLine();
}

// async task to simulate network I/O
private static async Task ExecuteBatch(IEnumerable<string> batch)
{
Console.WriteLine("Executing batch ");
await Task.Run(() => System.Threading.Thread.Sleep(2000));
}

最佳答案

chunkBlock 正在为每个 block 调用 makeBatches,并且您正在 makeBatches 中调用 batchBlock.Complete() >,所以在第一批之后它就不再接受新帖子了。

关于c# - 为什么在从 BufferBlock 完全处理所有异步调用之前我的数据流就完成了?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32039002/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com