gpt4 book ai didi

c# - 如何为 foreach 创建并行预取

转载 作者:行者123 更新时间:2023-11-30 15:45:10 24 4
gpt4 key购买 nike

鉴于在 C#、TPL、并行扩展、异步 CTP、响应式扩展中执行异步操作的众多新方法,我想知道并行化以下获取和处理部分的最简单方法是什么:

foreach(string url in urls)
{
var file = FetchFile(url);
ProcessFile(file);
}

条件是虽然文件可以随时获取,ProcessFile 一次只能处理一个文件,应该按顺序调用。

简而言之,让 FetchFileProcessFile 以流水线方式运行(即同时发生)的最简单方法是什么?

最佳答案

这是 RX 方式。此扩展会将 uri 的流转换为流的流:

    public static IObservable<Stream> RequestToStream(this IObservable<string> source, 
TimeSpan timeout)
{
return
from wc in source.Select(WebRequest.Create)
from s in Observable
.FromAsyncPattern<WebResponse>(wc.BeginGetResponse,
wc.EndGetResponse)()
.Timeout(timeout, Observable.Empty<WebResponse>())
.Catch(Observable.Empty<WebResponse>())
select s.GetResponseStream();
}

用法:

new [] { "myuri.net\file1.dat", "myuri.net\file2.dat" }
.ToObservable()
.RequestToStream(TimeSpan.FromSeconds(5))
.Do(stream = > ProcessStream(stream))
.Subscribe();

编辑:糟糕,没有注意到文件写入序列化要求。这部分可以通过使用 .Concat 来完成,它本质上是一个 RX 队列(另一个是 .Zip)

让我们有一个 .StreamToFile 扩展名:

    public static IObservable<Unit> StreamToFile(this Tuple<Stream, string> source)
{
return Observable.Defer(() =>
source.Item1.AsyncRead().WriteTo(File.Create(source.Item2)));
}

现在您可以让 Web 请求并行,但序列化来自它们的文件写入:

        new[] { "myuri.net\file1.dat", "myuri.net\file2.dat" }
.ToObservable()
.RequestToStream(TimeSpan.FromSeconds(5))
.Select((stream, i) => Tuple.Create(stream, i.ToString() + ".dat"))
.Select(x => x.StreamToFile())
.Concat()
.Subscribe();

关于c# - 如何为 foreach 创建并行预取,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/5460764/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com