gpt4 book ai didi

c# - 如何在 TPL 数据流中执行异步操作以获得最佳性能?

转载 作者:太空狗 更新时间:2023-10-29 19:45:10 24 4
gpt4 key购买 nike

我写了下面的方法来批处理一个巨大的 CSV 文件。这个想法是从文件中读取一大块行到内存中,然后将这些行 block 分成固定大小的批处理。获得分区后,将这些分区发送到服务器(同步或异步),这可能需要一段时间。

private static void BatchProcess(string filePath, int chunkSize, int batchSize)
{
List<string> chunk = new List<string>(chunkSize);

foreach (var line in File.ReadLines(filePath))
{
if (chunk.Count == chunk.Capacity)
{
// Partition each chunk into smaller chunks grouped on column 1
var partitions = chunk.GroupBy(c => c.Split(',')[0], (key, g) => g);

// Further breakdown the chunks into batch size groups
var groups = partitions.Select(x => x.Select((i, index) =>
new { i, index }).GroupBy(g => g.index / batchSize, e => e.i));

// Get batches from groups
var batches = groups.SelectMany(x => x)
.Select(y => y.Select(z => z)).ToList();

// Process all batches asynchronously
batches.AsParallel().ForAll(async b =>
{
WebClient client = new WebClient();
byte[] bytes = System.Text.Encoding.ASCII
.GetBytes(b.SelectMany(x => x).ToString());
await client.UploadDataTaskAsync("myserver.com", bytes);
});

// clear the chunk
chunk.Clear();
}

chunk.Add(line);
}
}

这段代码似乎不是很高效,原因有两个。

  1. 从 CSV 文件读取的主线程被阻塞,直到所有的分区都被处理完。

  2. AsParallel 会阻塞直到所有任务完成。因此,如果线程池中有更多线程可用于工作,我不会使用它们,因为任务数不受分区数的限制。

batchSize 是固定的,因此无法更改,但 chunkSize 可以针对性能进行调整。我可以选择足够大的 chunkSize,这样就不会创建任何批处理 >> 系统中没有可用的线程,但这仍然意味着 Parallel.ForEach 方法会阻塞,直到所有任务完成。

我如何更改代码,以便利用系统中的所有可用线程来完成工作而无需闲置。我在想我可以使用 BlockingCollection 来存储批处理,但不确定要给它多大的容量,因为每个 block 中没有批处理是动态的。

关于如何使用 TPL 最大化线程利用率以便系统上的大多数可用线程始终在做某事有什么想法吗?

更新:这是我到目前为止使用 TPL 数据流得到的结果。这是正确的吗?

private static void UploadData(string filePath, int chunkSize, int batchSize)
{
var buffer1 = new BatchBlock<string>(chunkSize);
var buffer2 = new BufferBlock<IEnumerable<string>>();

var action1 = new ActionBlock<string[]>(t =>
{
Console.WriteLine("Got a chunk of lines " + t.Count());

// Partition each chunk into smaller chunks grouped on column 1
var partitions = t.GroupBy(c => c.Split(',')[0], (key, g) => g);

// Further breakdown the chunks into batch size groups
var groups = partitions.Select(x => x.Select((i, index) =>
new { i, index }).GroupBy(g => g.index / batchSize, e => e.i));

// Get batches from groups
var batches = groups.SelectMany(x => x).Select(y => y.Select(z => z));

foreach (var batch in batches)
{
buffer2.Post(batch);
}

}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 });

buffer1.LinkTo(action1, new DataflowLinkOptions
{ PropagateCompletion = true });

var action2 = new TransformBlock<IEnumerable<string>,
IEnumerable<string>>(async b =>
{
await ExecuteBatch(b);
return b;

}, new ExecutionDataflowBlockOptions
{ MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });

buffer2.LinkTo(action2, new DataflowLinkOptions
{ PropagateCompletion = true });

var action3 = new ActionBlock<IEnumerable<string>>(b =>
{
Console.WriteLine("Finised executing a batch");
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 });

action2.LinkTo(action3, new DataflowLinkOptions
{ PropagateCompletion = true });

Task produceTask = Task.Factory.StartNew(() =>
{
foreach (var line in File.ReadLines(filePath))
{
buffer1.Post(line);
}

//Once marked complete your entire data flow will signal a stop for
// all new items
Console.WriteLine("Finished producing");
buffer1.Complete();
});

Task.WaitAll(produceTask);
Console.WriteLine("Produced complete");

action1.Completion.Wait();//Will add all the items to buffer2
Console.WriteLine("Action1 complete");

buffer2.Complete();//will not get any new items
action2.Completion.Wait();//Process the batch of 5 and then complete

Task.Wait(action3.Completion);

Console.WriteLine("Process complete");
Console.ReadLine();
}

最佳答案

你很接近,在 TPL 中,数据从一个 block 流到另一个 block ,你应该尽量保持这种范式。因此,例如 action1 应该是一个 TransformManyBlock,因为 ActionBlock 是一个 ITargetBlock(即终止 block )。

当您在链接上指定传播完成时,完成事件会自动路由到 block 中,因此您只需在最后一个 block 上执行一个 wait()。

将 is 想象成一个多米诺骨牌链,您在第一个 block 上调用 complete,它将通过链传播到最后一个 block 。

您还应该考虑什么是多线程以及为什么要使用多线程;您的示例严重受 I/O 限制,我认为绑定(bind)一堆线程来等待 I/O 完成不是正确的解决方案。

最后,请注意阻塞与否。在您的示例中,buffer1.Post(...) 不是阻塞调用,您没有理由在任务中使用它。

我编写了以下使用 TPL DataFlow 的示例代码:

static void Main(string[] args)
{
var filePath = "C:\\test.csv";
var chunkSize = 1024;
var batchSize = 128;

var linkCompletion = new DataflowLinkOptions
{
PropagateCompletion = true
};

var uploadData = new ActionBlock<IEnumerable<string>>(
async (data) =>
{
WebClient client = new WebClient();
var payload = data.SelectMany(x => x).ToArray();
byte[] bytes = System.Text.Encoding.ASCII.GetBytes(payload);
//await client.UploadDataTaskAsync("myserver.com", bytes);
await Task.Delay(2000);
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded /* Prefer to limit that to some reasonable value */ });

var lineBuffer = new BatchBlock<string>(chunkSize);

var splitData = new TransformManyBlock<IEnumerable<string>, IEnumerable<string>>(
(data) =>
{
// Partition each chunk into smaller chunks grouped on column 1
var partitions = data.GroupBy(c => c.Split(',')[0]);

// Further beakdown the chunks into batch size groups
var groups = partitions.Select(x => x.Select((i, index) => new { i, index }).GroupBy(g => g.index / batchSize, e => e.i));

// Get batches from groups
var batches = groups.SelectMany(x => x).Select(y => y.Select(z => z));

// Don't forget to enumerate before returning
return batches.ToList();
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });
lineBuffer.LinkTo(splitData, linkCompletion);
splitData.LinkTo(uploadData, linkCompletion);

foreach (var line in File.ReadLines(filePath))
{
lineBuffer.Post(line);
}
lineBuffer.Complete();

// Wait for uploads to finish
uploadData.Completion.Wait();
}

关于c# - 如何在 TPL 数据流中执行异步操作以获得最佳性能?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32031783/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com