gpt4 book ai didi

c# - 可在 C# 中使用背压进行观察

转载 作者:行者123 更新时间:2023-12-05 03:35:55 27 4
gpt4 key购买 nike

C# rx 中有处理背压的方法吗?我正在尝试从分页查询的结果中调用 Web API。这个 web api 非常脆弱,我需要的并发调用不超过 3 个,因此,程序应该是这样的:

  1. 从数据库中提取一个页面
  2. 以页面上每条记录最多三个并发调用的方式调用 web api
  3. 将结果存回数据库
  4. 获取另一个页面并重复,直到没有更多结果。

我并没有真正得到我想要的序列,基本上数据库会得到所有记录,不管它们是否可以被处理。

我已经尝试了多种方法,包括调整 ObserveOn 运算符、实现信号量以及其他一些方法。我可以得到一些指导来实现这样的事情吗?

using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Reactive.Threading.Tasks;
using System.Threading;
using System.Threading.Tasks;
using Castle.Core.Internal;
using Xunit;
using Xunit.Abstractions;

namespace ProductValidation.CLI.Tests.Services
{
public class Example
{
private readonly ITestOutputHelper output;

public Example(ITestOutputHelper output)
{
this.output = output;
}

[Fact]
public async Task RunsObservableToCompletion()
{
var repo = new Repository(output);
var client = new ServiceClient(output);

var results = repo.FetchRecords()
.Select(x => client.FetchMoreInformation(x).ToObservable())
.Merge(1)
.Do(async x => await repo.Save(x));

await results.LastOrDefaultAsync();
}
}

public class Repository
{
private readonly ITestOutputHelper output;

public Repository(ITestOutputHelper output)
{
this.output = output;
}

public IObservable<int> FetchRecords()
{
return Observable.Create<int>(async (observer) =>
{
var page = 1;
var products = await FetchPage(page);
while (!products.IsNullOrEmpty())
{
foreach (var product in products)
{
observer.OnNext(product);
}

page += 1;
products = await FetchPage(page);
}
observer.OnCompleted();
})
.ObserveOn(SynchronizationContext.Current);
}

private async Task<IEnumerable<int>> FetchPage(int page)
{
// Simulate fetching a paged query.
await Task.Delay(500).ToObservable().ObserveOn(new TaskPoolScheduler(new TaskFactory()));
output.WriteLine("Fetching page {0}", page);
if (page >= 4) return Enumerable.Empty<int>();
return Enumerable.Range(1, 3).Select(_ => page);
}

public async Task Save(string id)
{
await Task.Delay(50); //Simulates latency
}
}

public class ServiceClient
{
private readonly ITestOutputHelper output;
private readonly SemaphoreSlim semaphore;

public ServiceClient(ITestOutputHelper output)
{
this.output = output;
this.semaphore = new SemaphoreSlim(2);
}

public async Task<string> FetchMoreInformation(int id)
{
try
{
output.WriteLine("Calling the web client for {0}", id);
await semaphore.WaitAsync(); // Protection for the webapi not sending too many calls
await Task.Delay(1000); //Simulates latency
return id.ToString();
}
finally
{
semaphore.Release();
}
}
}
}

最佳答案

Rx 不支持背压,因此没有简单的方法以与处理记录相同的速度从 DB 中获取记录。也许你可以使用 Subject<Unit>作为一种信号机制,每次处理记录时都会推送一个值,并设计一种方法在生产站点使用这些信号在收到信号时从数据库中获取新记录。但这将是一个困惑且惯用的解决方案。 TPL Dataflow是比 Rx 更适合做这类工作的工具。它本身支持 BoundedCapacity 配置选项。

关于您发布的代码的一些评论,与背压问题没有直接关系:

Merge运算符 maxConcurrent参数对内部序列的并发订阅施加了限制,但如果内部序列已经启动并运行,则这将无效。所以你必须确保内部序列是冷的,一个方便的方法是 Defer运算符(operator):

.Select(x => Observable.Defer(() =>
client.FetchMoreInformation(x).ToObservable()))

将异步方法转换为延迟可观察序列的更常见方法是 FromAsync运算符(operator):

.Select(x => Observable.FromAsync(() => client.FetchMoreInformation(x)))

顺便说一句 Do运算符不理解异步委托(delegate),所以不是:

.Do(async x => await repo.Save(x));

...创建 async void lambda,最好这样做:

.Select(x => Observable.FromAsync(() => repo.Save(x)))
.Merge(1);

更新:这是一个如何使用 SemaphoreSlim 的示例为了在 Rx 中实现背压:

const int boundedCapacity = 10;
using var semaphore = new SemaphoreSlim(boundedCapacity, boundedCapacity);

IObservable<int> results = repo
.FetchRecords(semaphore)
.Select(x => Observable.FromAsync(() => client.FetchMoreInformation(x)))
.Merge(1)
.Select(x => Observable.FromAsync(() => repo.Save(x)))
.Merge(1)
.Do(_ => semaphore.Release());

await results.DefaultIfEmpty();

FetchRecords 里面方法:

//...
await semaphore.WaitAsync();
observer.OnNext(product);
//...

这是一个脆弱的解决方案,因为它依赖于通过管道传播所有元素。如果将来您决定在管道内包括过滤或节流,那么 WaitAsync 之间的一对一关系和 Release将被违反,最有可能的结果是管道死锁。

关于c# - 可在 C# 中使用背压进行观察,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/69746475/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com