gpt4 book ai didi

c# - 设置对 azure DownloadToStreamAsync 接收的并行任务的限制

转载 作者:太空宇宙 更新时间:2023-11-03 10:50:39 26 4
gpt4 key购买 nike

我有一堆文件(大约 10k)需要从 Windows Azure 存储下载。为了让它们并行下载而不是一次下载一个,我使用了 blob DownloadToStreamAsync 方法,该方法返回一个 Task 对象。然后,我使用将流保存到文件的方法设置任务ContinueWith。

这是代码:

foreach (var File in ServerFiles)
{
string sFileName = File.Uri.LocalPath.ToString();
CloudBlockBlob oBlob = BiActionscontainer.GetBlockBlobReference(sFileName.Replace("/" + Container + "/", ""));

MemoryStream ms = new MemoryStream();
BlobRequestOptions f = new BlobRequestOptions();
Task downloadTask = oBlob.DownloadToStreamAsync(ms);

downloadTask.ContinueWith((Task task) =>
{
ms.Position = 0;
lock(lockObject)
{
using (FileStream file = new FileStream(ResultPath, FileMode.Append, FileAccess.Write))
{
byte[] bytes = ms.ToArray();
file.Write(bytes, 0, bytes.Length);
}
}
ms.Dispose();
});
}

此代码是在我们的服务器之一(而不是 Azure 上)- Windows 2003 服务器上运行的工具的一部分。问题是,在该服务器上,我得到“操作已超时。Windows 2003 标准上的 Microsoft.WindowsAzure.Storage”,所以我认为可能是很多文件同时发出请求并阻塞了带宽.

所以我想知道,在从第三方库获取任务对象的情况下,如何限制一次运行的并行数量?仍然对剩余的任务进行排队?

最佳答案

您可以使用SemaphoreSlim来实现此目的。设置您想要的并发请求数,然后在开始每个请求之前使用 await WaitAsync(),在每个请求完成后使用 Release(),最后等待剩下的任务。

封装在辅助方法中,它可能如下所示:

public static async Task ForEachAsync<T>(
this IEnumerable<T> items, Func<T, Task> action, int maxDegreeOfParallelism)
{
var semaphore = new SemaphoreSlim(maxDegreeOfParallelism);

var tasks = new List<Task>();

foreach (var item in items)
{
await semaphore.WaitAsync();

Func<T, Task> loopAction = async x =>
{
await action(x);
semaphore.Release();
};

tasks.Add(loopAction(item));
}

await Task.WhenAll(tasks);
}

用法(对代码进行一些更改,主要是为了简化它并使其更加异步):

ServerFiles.ForEachAsync(async file =>
{
string sFileName = File.Uri.LocalPath.ToString();
CloudBlockBlob oBlob = BiActionscontainer.GetBlockBlobReference(sFileName.Replace("/" + Container + "/", ""));

var ms = new MemoryStream();
BlobRequestOptions f = new BlobRequestOptions();
await oBlob.DownloadToStreamAsync(ms);

ms.Position = 0;
lock (lockObject)
{
using (var file = new FileStream(ResultPath, FileMode.Append, FileAccess.Write))
{
await ms.CopyToAsync(file);
}
}
});

另一种实现方式是使用 TPL Dataflow 中的 ActionBlock。它知道如何完成此处所需的所有操作,您只需进行设置即可:

public static Task ForEachAsync<T>(
this IEnumerable<T> items, Func<T, Task> action, int maxDegreeOfParallelism)
{
var block = new ActionBlock<T>(
action,
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = maxDegreeOfParallelism
});

foreach (var item in items)
{
block.Post(item);
}

block.Complete();
return block.Completion;
}

关于c# - 设置对 azure DownloadToStreamAsync 接收的并行任务的限制,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/21525580/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com