gpt4 book ai didi

c# - 以最大并行度将 IEnumerable 转换为 IObservable

转载 作者:太空狗 更新时间:2023-10-30 00:40:41 25 4
gpt4 key购买 nike

我有一系列异步任务要做(比如,获取 N 个网页)。现在我想要的是将它们全部公开为 IObservable<T> 。我当前的解决方案使用 this question 的答案:

async Task<ResultObj> GetPage(string page) {
Console.WriteLine("Before");
var result = await FetchFromInternet(page);
Console.WriteLine("After");
return result;
}

// pages is an IEnumerable<string>
IObservable<ResultObj> resultObservable =pages.Select(GetPage).
Select(t => Observable.FromAsync(() => t)).Merge();

// Now consume the list
foreach(ResultObj obj in resultObservable.ToEnumerable()) {
Console.WriteLine(obj.ToString());
}

问题是我不知道要获取的页面数量,而且可能很大。我不想同时发出数百个请求。所以我想要一种方法来限制将并行执行的最大任务数。有没有办法限制并发调用 GetPage 的数量?

有一个 Merge 重载,它接受一个 maxConcurrent 参数,但它似乎并没有真正限制函数调用的并发。控制台在 After 消息之前打印所有 Before 消息。

注意:我需要转换回 IEnumerable<T> 。我正在为一个系统编写一个数据源,该系统为我提供要获取的数据描述符,我需要将已下载数据的列表还给它。

最佳答案

编辑

以下应该有效。 This overload限制并发订阅的数量。

var resultObservable = pages
.Select(p => Observable.FromAsync(() => GetPage(p)))
.Merge(maxConcurrent);

解释

为了理解为什么需要进行此更改,我们需要一些背景知识

  1. FromAsync 返回一个将调用传递的 Func 的可观察对象 every time it is subscribed to .这意味着如果从未订阅可观察对象,则永远不会调用它。

  2. Merge 急切地读取源序列,并且只同时订阅 n 个可观察对象。

通过这两部分我们可以知道为什么原始版本将并行执行所有内容:因为 (2),GetPage 将在 时为所有源字符串调用Merge 决定需要订阅多少个 observable。

我们还可以看到为什么第二个版本有效:即使序列已经完全迭代,(1) 意味着 GetPageMerge 决定之前不会被调用它需要订阅 n observables。这导致仅 n 任务同时执行的预期结果。

关于c# - 以最大并行度将 IEnumerable<T> 转换为 IObservable<T>,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25436542/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com