gpt4 book ai didi

c# - 在 C# .NET 中管理多线程,控制每个操作的线程数

转载 作者:行者123 更新时间:2023-12-03 12:58:40 25 4
gpt4 key购买 nike

认为我给出的标题有点令人困惑,但很难表达我想要问的内容。

基本上,我正在使用 .NET 在 C# 中编写一个程序,该程序使用 Google 云 API 来上传数据。

我试图以一种有效的方式做到这一点,并成功地使用了 parallel.foreach,但我需要更好的控制。我将要上传的文件收集到一个列表中,我想按文件大小对这些文件进行排序,然后将其拆分为 3 个大小相同(以千兆字节计,而不是文件数)的列表。

其中一个列表将包含总上传大小的三分之一,但由最大的文件(以千兆字节为单位)组成,因此文件数最少,下一个列表将与第一个列表的总千兆字节相同,但包含更多数量的小文件,最后一个列表将包含许多小文件,但也应该与其他子列表的大小相同。

然后我想为上传过程分配一定数量的线程。 (例如,我希望最大的文件列表分配 5 个线程,中间分配 3 个线程,小文件列表仅分配 2 个线程。)是否可以将这 3 个列表设置为并行迭代,同时控制如何分配了多少线程?

这样做的最佳方法是什么?

最佳答案

Parallel.ForEachPLINQ用于数据并行 - 使用多个内核处理大块数据。它适用于您在内存中有 1GB 数据(或非常快的 IEnumerable 源)并希望使用所有内核处理它的情况。在这种情况下,您需要将数据划分为独立的块,并让一个 worker 一次 crunch 一次,以限制同步开销。

您所描述的是大量文件的并发上传。那是纯粹的 IO,而不是数据并行性。大部分时间将用于从磁盘加载数据或将其写入网络。这是 Task.Run 的工作和 async/await .要同时上传多个文件,您可以使用 ActionBlockChannel将文件排队并异步上传。对于 channel ,您必须编写一些工作程序样板,但您可以获得更大的控制权,尤其是在您想要使用相同客户端实例进行多次调用的情况下。 ActionBlock 本质上是无状态的。

最后,您根据大小描述具有不同 DOP 的队列,当您同时拥有大文件和小文件时,这是一个非常好的主意。您可以通过使用多个 ActionBlock 实例(每个实例具有不同的 DOP)或多个 Channel worker(每个具有不同的 DOP)来实现此目的。

数据流

假设您已经有一个按路径名上传文件的方法:

//Adopted from the Google SDK example
async Task UploadFile(DriveService service,FileInfo file)
{
var fileName=Path.GetFileName(filePath);

using var uploadStream = file.OpenRead();
var request insertRequest = service.Files.Insert(
new File { Title = file.Name },
uploadStream,
"image/jpeg");

await insert.UploadAsync();
}

您可以创建三个不同的 ActionBlock 实例,每个实例都有不同的 DOP :
var small=new ActionBlock<FileInfo>(
file=>UploadFile(service,file),
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 15
});
var medium=new ActionBlock<FileInfo>(
file=>UploadFile(service,file),
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 10
});

var big=new ActionBlock<FileInfo>(
path=>UploadFile(service,file),
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 2
});

并根据大小将不同的文件发布到不同的块:
var directory=new DirectoryInfo(...);
var files=directory.EnumerateFiles(...);
foreach(var file in files)
{
switch (file.Length)
{
case int x when x < 1024:
small.Post(file);
break;
case int x when x < 10240:
medium.Post(file);
break;
default:
big.Post(file);
break;
}
}

或者,在 C# 8 中:
foreach(var file in files)
{

var block = file.Length switch {
long x when x < 1024 => small,
long x when x < 10240=> medium,
_ => big
};
block.Post(file)
}

当迭代完成时,我们需要通过调用 Complete() 来告诉我们已经完成的块。在每一个上等待所有的人完成:
small.Complete();
medium.Complete();
big.Complete();

await Task.WhenAll(small.Completion, medium.Completion, big.Completion);

关于c# - 在 C# .NET 中管理多线程,控制每个操作的线程数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60450145/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com