gpt4 book ai didi

c# - 使用适用于 .NET 的 AWS S3 SDK 从 Amazon S3 下载并行批处理文件

转载 作者:可可西里 更新时间:2023-11-01 08:05:54 26 4
gpt4 key购买 nike

问题:我想使用他们的 .NET SDK 从 AWS S3 并行下载 100 个文件。下载的内容应该存储在 100 个内存流中(文件足够小,我可以从那里拿)。我对 Task、IAsyncResult、Parallel.* 和 .NET 4.0 中的其他不同方法感到困惑。

如果我尝试自己解决问题,我会突然想到像这样的伪代码:(编辑以向某些变量添加类型)

using Amazon;
using Amazon.S3;
using Amazon.S3.Model;

AmazonS3 _s3 = ...;
IEnumerable<GetObjectRequest> requestObjects = ...;


// Prepare to launch requests
var asyncRequests = from rq in requestObjects
select _s3.BeginGetObject(rq,null,null);

// Launch requests
var asyncRequestsLaunched = asyncRequests.ToList();

// Prepare to finish requests
var responses = from rq in asyncRequestsLaunched
select _s3.EndGetRequest(rq);

// Finish requests
var actualResponses = responses.ToList();

// Fetch data
var data = actualResponses.Select(rp => {
var ms = new MemoryStream();
rp.ResponseStream.CopyTo(ms);
return ms;
});

此代码并行启动 100 个请求,这很好。但是,有两个问题:

  1. 最后一条语句将串行下载文件,而不是并行下载。流中似乎没有 BeginCopyTo()/EndCopyTo() 方法...
  2. 在所有请求都响应之前,前面的语句不会放行。换句话说,在所有文件开始之前,所有文件都不会开始下载。

所以我开始觉得我走错了路......

帮忙吗?

最佳答案

如果您将操作分解为一个方法,该方法将异步处理一个请求,然后调用它 100 次,这可能会更容易。

首先,让我们确定您想要的最终结果。由于您将要使用的是 MemoryStream 这意味着您将要返回 Task<MemoryStream> 从你的方法。签名看起来像这样:

static Task<MemoryStream> GetMemoryStreamAsync(AmazonS3 s3, 
GetObjectRequest request)

因为你的 AmazonS3对象实现了 Asynchronous Design Pattern , 您可以使用 FromAsync method TaskFactory class 上生成 Task<T>来自实现异步设计模式的类,如下所示:

static Task<MemoryStream> GetMemoryStreamAsync(AmazonS3 s3, 
GetObjectRequest request)
{
Task<GetObjectResponse> response =
Task.Factory.FromAsync<GetObjectRequest,GetObjectResponse>(
s3.BeginGetObject, s3.EndGetObject, request, null);

// But what goes here?

所以你已经在一个好地方,你有一个 Task<T>您可以等待或在调用完成时获得回调。但是,您需要以某种方式翻译 GetObjectResponse从对 Task<GetObjectResponse> 的调用返回进入 MemoryStream .

为此,您想使用 ContinueWith methodTask<T> 上类(class)。将其视为 Select method 的异步版本在 Enumerable class 上, 它只是对另一个 Task<T> 的投影除了每次你调用ContinueWith ,您可能正在创建一个运行代码部分的新任务。

这样,您的方法如下所示:

static Task<MemoryStream> GetMemoryStreamAsync(AmazonS3 s3, 
GetObjectRequest request)
{
// Start the task of downloading.
Task<GetObjectResponse> response =
Task.Factory.FromAsync<GetObjectRequest,GetObjectResponse>(
s3.BeginGetObject, s3.EndGetObject, request, null
);

// Translate.
Task<MemoryStream> translation = response.ContinueWith(t => {
using (Task<GetObjectResponse> resp = t ){
var ms = new MemoryStream();
t.Result.ResponseStream.CopyTo(ms);
return ms;
}
});

// Return the full task chain.
return translation;
}

请注意,在上面您可以调用 overload of ContinueWith 路过 TaskContinuationOptions.ExecuteSynchronously ,看起来您所做的工作很少(我不知道,响应可能巨大)。如果您所做的工作非常少,而为了完成工作而开始一项新任务是有害的,您应该通过 TaskContinuationOptions.ExecuteSynchronously。这样您就不会浪费时间为最少的操作创建新任务。

现在您有了可以将一个 请求转换为 Task<MemoryStream> 的方法,创建一个将处理任何个它们的包装器很简单:

static Task<MemoryStream>[] GetMemoryStreamsAsync(AmazonS3 s3,
IEnumerable<GetObjectRequest> requests)
{
// Just call Select on the requests, passing our translation into
// a Task<MemoryStream>.
// Also, materialize here, so that the tasks are "hot" when
// returned.
return requests.Select(r => GetMemoryStreamAsync(s3, r)).
ToArray();
}

在上面,您只需获取 GetObjectRequest 的序列实例,它将返回 Task<MemoryStream> 的数组.它返回具体化序列这一事实很重要。如果您在返回之前没有具体化它,那么在迭代序列之前不会创建任务。

当然,如果您想要这种行为,那么一定要删除对 .ToArray() 的调用。 , 让方法返回 IEnumerable<Task<MemoryStream>>然后将在您遍历任务时发出请求。

从那里,您可以一次处理一个(在循环中使用 Task.WaitAny method)或等待所有它们完成(通过调用 Task.WaitAll method)。后者的一个例子是:

static IList<MemoryStream> GetMemoryStreams(AmazonS3 s3, 
IEnumerable<GetObjectRequest> requests)
{
Task<MemoryStream>[] tasks = GetMemoryStreamsAsync(s3, requests);
Task.WaitAll(tasks);
return tasks.Select(t => t.Result).ToList();
}

此外,应该提到的是,这非常适合 Reactive Extensions framework ,因为这非常非常适合 IObservable<T> 实现。

关于c# - 使用适用于 .NET 的 AWS S3 SDK 从 Amazon S3 下载并行批处理文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/10486822/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com