gpt4 book ai didi

c# - Enumerable.Range/Observable.FromAsync 上的 Reactive Extensions OperationCancelled 异常

转载 作者:行者123 更新时间:2023-12-03 12:58:28 28 4
gpt4 key购买 nike

我有以下代码,它从 REST 分页 API 中提取数据。
当使用响应式(Reactive)扩展时,它接近下载的结尾(即已知目标 1,653 中的第 1,636 页,它获得的确切计数取决于具有较高并发性的并发提取,从而导致已知目标的页面计数较低)。然后我的接收函数抛出一个 OperationCancelled异常(但是我从未设置我的取消 token 源)。
就像 Select正在以某种方式取消我的功能,但仅在分页请求接近尾声时,或者 observable 终止然后终止我的 observable,我认为(但对 rx.net 来说是新手)。
这也不是速率限制问题,一次下载一次即可(MaxConcurrentDownloads 设置为 1)。
有什么想法我做错了什么吗?

using var httpClient = new HttpClient();
var api = new PolygonWebApi(httpClient, this.apiKey);
var list = new List<TickerV2>();
var start = DateTime.Now;

// get first page
var response = await api.GetTickersAsync(BatchSize, 1, this.cts.Token);
list.AddRange(response.Tickers);
var pages = (response.Count + BatchSize - 1) / BatchSize;

var query = Enumerable
.Range(2, pages - 1)
.ToObservable()
.Select(page => Observable.FromAsync(() =>
{
return api
.GetTickersAsync(BatchSize, page, this.cts.Token)
.ContinueWith( x => new TickersResponseWithPage(page, x.Result));
}))
.Merge(MaxConcurrentDownloads);

query.Subscribe((response) =>
{
this.logger.LogInformation($"adding {response.TickersResponse.Tickers.Length} records from page {response.Page}");
list.AddRange(response.TickersResponse.Tickers);
});
await query.ToTask(this.cts.Token);

var duration = DateTime.Now - start;
this.logger.LogInformation($"{nameof(UpdateTickersWorker)} downloaded {list.Count:n0} in {duration.Humanize()}");
如果需要附加信息,顺序测试证明对 API 的调用是正常的并返回所有 1,653 页
using var httpClient = new HttpClient();
var api = new PolygonWebApi(httpClient, this.apiKey);
var list = new List<TickerV2>();
var start = DateTime.Now;


// get first page
var response = await api.GetTickersAsync(BatchSize, 1, this.cts.Token);
list.AddRange(response.Tickers);
var pages = (response.Count + BatchSize - 1) / BatchSize;

// read from second page
for (var page = 2; page <= pages && this.cts.Token.IsCancellationRequested == false; page++)
{
response = await api.GetTickersAsync(BatchSize, page, this.cts.Token);
list.AddRange(response.Tickers);
this.logger.LogInformation($"adding {response.Tickers.Length} records from page {page}");
}

var duration = DateTime.Now - start;
this.logger.LogInformation($"{nameof(UpdateTickersWorker)} downloaded {list.Count} in {duration.Humanize()}");
更新
我修改了以下内容以停止重复页面,似乎已经解决了重复问题:
IObservable<IList<TickerV2>> query =
Observable
.Using(
() => new HttpClient(),
hc =>
from first_response in Observable.FromAsync(ct => PolygonWebApi.GetTickersAsync(hc, this.apiKey, BatchSize, 1, ct))
let pages = (first_response.Count + BatchSize - 1) / BatchSize
from trwp in
Observable
.Range(2, pages - 1)
.Select(page =>
Observable
.FromAsync(ct => PolygonWebApi.GetTickersAsync(hc, this.apiKey, BatchSize, page, ct))
.Select(r => new TickersResponseWithPage(page, r)))
.Merge(MaxConcurrentDownloads)
.StartWith(new TickersResponseWithPage(1, first_response))
from tv2 in trwp.TickersResponse.Tickers
select tv2)
.ToList();

list = await query.ToTask(this.cts.Token);

最佳答案

您正在对同步代码、枚举、与 Rx 和任务进行很多奇怪的混合。所有这些都会在调试时造成很大的困惑。你应该选择一个 monad 并一直呆在它里面——不要混合它们。
你能试试这个纯 Rx 版本的代码,让我知道你得到什么样的结果吗?请附加到您的问题的末尾,不要更改那里的内容。

IObservable<IList<TickerV2>> query =
Observable
.Using(
() => new HttpClient(),
hc =>
from first_response in Observable.FromAsync(ct => api.GetTickersAsync(BatchSize, 1, ct))
let pages = (first_response.Count + BatchSize - 1) / BatchSize
from trwp in
Observable
.Range(2, pages - 1)
.SelectMany(page =>
Observable
.FromAsync(ct => api.GetTickersAsync(BatchSize, page, ct))
.Select(r => new TickersResponseWithPage(page, r)))
.StartWith(new TickersResponseWithPage(1, first_response))
from tv2 in trwp.TickersResponse.Tickers
select tv2)
.ToList();

IList<TickerV2> list = await query;

以下是如何创建 api Defer 中的对象:
IObservable<IList<TickerV2>> query =
Observable
.Defer(() =>
{
var api = new PolygonWebApi(httpClient, this.apiKey);
return
Observable
.Using(... as above ...)
.ToList();
});

关于c# - Enumerable.Range/Observable.FromAsync 上的 Reactive Extensions OperationCancelled 异常,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62625198/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com