- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我有以下代码,它从 REST 分页 API 中提取数据。
当使用响应式(Reactive)扩展时,它接近下载的结尾(即已知目标 1,653 中的第 1,636 页,它获得的确切计数取决于具有较高并发性的并发提取,从而导致已知目标的页面计数较低)。然后我的接收函数抛出一个 OperationCancelled
异常(但是我从未设置我的取消 token 源)。
就像 Select
正在以某种方式取消我的功能,但仅在分页请求接近尾声时,或者 observable 终止然后终止我的 observable,我认为(但对 rx.net 来说是新手)。
这也不是速率限制问题,一次下载一次即可(MaxConcurrentDownloads
设置为 1)。
有什么想法我做错了什么吗?
using var httpClient = new HttpClient();
var api = new PolygonWebApi(httpClient, this.apiKey);
var list = new List<TickerV2>();
var start = DateTime.Now;
// get first page
var response = await api.GetTickersAsync(BatchSize, 1, this.cts.Token);
list.AddRange(response.Tickers);
var pages = (response.Count + BatchSize - 1) / BatchSize;
var query = Enumerable
.Range(2, pages - 1)
.ToObservable()
.Select(page => Observable.FromAsync(() =>
{
return api
.GetTickersAsync(BatchSize, page, this.cts.Token)
.ContinueWith( x => new TickersResponseWithPage(page, x.Result));
}))
.Merge(MaxConcurrentDownloads);
query.Subscribe((response) =>
{
this.logger.LogInformation($"adding {response.TickersResponse.Tickers.Length} records from page {response.Page}");
list.AddRange(response.TickersResponse.Tickers);
});
await query.ToTask(this.cts.Token);
var duration = DateTime.Now - start;
this.logger.LogInformation($"{nameof(UpdateTickersWorker)} downloaded {list.Count:n0} in {duration.Humanize()}");
如果需要附加信息,顺序测试证明对 API 的调用是正常的并返回所有 1,653 页
using var httpClient = new HttpClient();
var api = new PolygonWebApi(httpClient, this.apiKey);
var list = new List<TickerV2>();
var start = DateTime.Now;
// get first page
var response = await api.GetTickersAsync(BatchSize, 1, this.cts.Token);
list.AddRange(response.Tickers);
var pages = (response.Count + BatchSize - 1) / BatchSize;
// read from second page
for (var page = 2; page <= pages && this.cts.Token.IsCancellationRequested == false; page++)
{
response = await api.GetTickersAsync(BatchSize, page, this.cts.Token);
list.AddRange(response.Tickers);
this.logger.LogInformation($"adding {response.Tickers.Length} records from page {page}");
}
var duration = DateTime.Now - start;
this.logger.LogInformation($"{nameof(UpdateTickersWorker)} downloaded {list.Count} in {duration.Humanize()}");
更新
IObservable<IList<TickerV2>> query =
Observable
.Using(
() => new HttpClient(),
hc =>
from first_response in Observable.FromAsync(ct => PolygonWebApi.GetTickersAsync(hc, this.apiKey, BatchSize, 1, ct))
let pages = (first_response.Count + BatchSize - 1) / BatchSize
from trwp in
Observable
.Range(2, pages - 1)
.Select(page =>
Observable
.FromAsync(ct => PolygonWebApi.GetTickersAsync(hc, this.apiKey, BatchSize, page, ct))
.Select(r => new TickersResponseWithPage(page, r)))
.Merge(MaxConcurrentDownloads)
.StartWith(new TickersResponseWithPage(1, first_response))
from tv2 in trwp.TickersResponse.Tickers
select tv2)
.ToList();
list = await query.ToTask(this.cts.Token);
最佳答案
您正在对同步代码、枚举、与 Rx 和任务进行很多奇怪的混合。所有这些都会在调试时造成很大的困惑。你应该选择一个 monad 并一直呆在它里面——不要混合它们。
你能试试这个纯 Rx 版本的代码,让我知道你得到什么样的结果吗?请附加到您的问题的末尾,不要更改那里的内容。
IObservable<IList<TickerV2>> query =
Observable
.Using(
() => new HttpClient(),
hc =>
from first_response in Observable.FromAsync(ct => api.GetTickersAsync(BatchSize, 1, ct))
let pages = (first_response.Count + BatchSize - 1) / BatchSize
from trwp in
Observable
.Range(2, pages - 1)
.SelectMany(page =>
Observable
.FromAsync(ct => api.GetTickersAsync(BatchSize, page, ct))
.Select(r => new TickersResponseWithPage(page, r)))
.StartWith(new TickersResponseWithPage(1, first_response))
from tv2 in trwp.TickersResponse.Tickers
select tv2)
.ToList();
IList<TickerV2> list = await query;
api
Defer
中的对象:
IObservable<IList<TickerV2>> query =
Observable
.Defer(() =>
{
var api = new PolygonWebApi(httpClient, this.apiKey);
return
Observable
.Using(... as above ...)
.ToList();
});
关于c# - Enumerable.Range/Observable.FromAsync 上的 Reactive Extensions OperationCancelled 异常,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62625198/
我创建了以下 sub 来简单地说明问题。我将事件工作表的范围 A2:E10 分配给范围变量。然后,对于另一个范围变量,我将这个范围的子范围,单元格 (1, 1) 分配给 (3, 3)。 我原以为这将包
我使用正则表达式来搜索以下属性返回的纯文本: namespace Microsoft.Office.Interop.Word { public class Range {
我正在开发一个宏来突出显示某些行/单元格以供进一步审查。一些值/空白将以红色突出显示,其他以橙色突出显示,而整行应为黄色。我从上一个问题中得到了一些帮助,并添加了更多细节,它工作得几乎完美,但我被困在
这个问题在这里已经有了答案: What is the difference between range and xrange functions in Python 2.X? (28 个答案) 关闭
我在尝试运行脚本时遇到这个奇怪的错误,代码似乎是正确的,但似乎 python (3) 不喜欢这部分: def function(x): if int
我正在编写一种算法,将一些数据写入提供的输出范围(问题的初始文本包括具体细节,这将评论中的讨论转向了错误的方向)。我希望它在 API 中尽可能接近标准库中的其他范围算法。 我查看了 std::rang
这按预期工作: #include #include int main() { auto chunklist = ranges::views::ints(1, 13) | ranges::vie
我这里有一个字符串,我正在尝试对其进行子字符串化。 let desc = "Hello world. Hello World." var stringRange = 1..' 的值转换为预期的参数类型
我有一个高级搜索功能,可以根据日期和时间查询记录。我想返回日期时间范围内的所有记录,然后从该范围内返回我想将结果缩小到一个小时范围(例如 2012 年 5 月 1 日 - 2012 年 5 月 7 日
Go 中的 range 函数和 range 关键字有什么区别? func main(){ s := []int{10, 20, 30, 40, 50, 60, 70, 80, 90}
如果我有一个范围,如何将其拆分为一系列连续的子范围,其中指定了子范围(存储桶)的数量?如果没有足够的元素,则应省略空桶。 例如: splitRange(1 to 6, 3) == Seq(Range(
我正在开发 VSTO Excel 项目,但在管理 Range 对象时遇到一些问题。 实际上,我需要知道当前选定的范围是否与我存储在列表中的另一个范围重叠。所以基本上,我有 2 个 Range 实例,我
在即将推出的 C++20 系列中,将有 range concept具有以下定义: template concept range = __RangeImpl; // exposition-only de
希望有人能回答我的问题。我在 VHDL 代码中遇到了这个命令,但不确定它到底做了什么。有人可以澄清以下内容吗? if ( element1 = (element1'range => '0')) the
可以将范围嵌套在范围中吗?使用范围内的变量?因为我想取得一些效果。为了说明这个问题,我有以下伪代码: for i in range(str(2**i) for i in range(1,2)):
我想在 2 个日期之间创建一个范围,并且我的范围字段有时间 damage_list = Damage.objects.filter(entry_date__range=(fdate, tdate))
在下面的代码中 #include #include #include int main() { std::unordered_mapm; m["1"]=1; m["2"]=2
我试图为我的电子表格做一个简单的循环,它循环遍历一个范围并检查该行是否为空,如果不是,则循环遍历一系列列并检查它们是否为空,如果是则它设置一个消息。 问题是每次它通过循环 ro.value 和 col
我在将一个工作簿范围中的值分配给当前工作簿中的某个范围时遇到问题。当我使用 Range("A1:C1") 分配我的范围时,此代码工作正常,但是当我使用 Range(Cells(1,1),Cells(1
我改写了原来的问题。 Sub s() Dim r As Range Set r = ActiveSheet.Range("B2:D5") Debug.Print r.Rows.Count
我是一名优秀的程序员,十分优秀!