gpt4 book ai didi

c# - 响应式(Reactive)管道——如何控制并行度?

转载 作者:行者123 更新时间:2023-11-30 14:52:46 24 4
gpt4 key购买 nike

我正在构建一个简单的处理管道,其中一个项目作为输入获取,它由多个处理器按顺序操作,最后输出。下图描述了整体架构:

rx-pipe

目前的工作方式:Pipeline 正在尽可能快地从提供程序中获取项目。一旦获取了一个项目,它就会被传递给处理器。处理项目后,将通知输出。虽然单个项目是按顺序处理的,但多个项目可能会并行处理(取决于它们从提供商处获取的速度)。

创建并从管道返回的 IObservable 如下所示:

return Observable.Create<T>(async observer =>
{
while (_provider.HasNext)
{
T item = await _provider.GetNextAsync();
observer.OnNext(item);
}
}).SelectMany(item => Observable.FromAsync(() =>
_processors.Aggregate(
seed: Task.FromResult(item),
func: (current, processor) => current.ContinueWith( // Append continuations.
previous => processor.ProcessAsync(previous.Result))
.Unwrap()))); // We need to unwrap Task{T} from Task{Task{T}};

缺少的部分:我需要一种控制机制来控制在任何给定时间管道中可以有多少项目(最大值)

例如,如果最大并行处理数为 3,则将导致以下工作流程:

  1. 项目 1 被提取并传递给处理器。
  2. 项目 2 被提取并传递给处理器。
  3. 项目 3 被提取并传递给处理器。
  4. 项目 1 已完成处理。
  5. 项目 4 被提取并传递给处理器。
  6. 第 3 项已完成处理。
  7. 第 5 项被提取并传递给处理器。
  8. 等等...

最佳答案

Merge提供一个需要 max concurrency 的重载.

它的签名看起来像:IObservable<T> Merge<T>(this IObservable<IObservable<T>> source, int maxConcurrency);

这是您的示例的样子(我也重构了一些其他代码,您可以接受或离开):

return Observable
//Reactive while loop also takes care of the onComplete for you
.While(() => _provider.HasNext,
Observable.FromAsync(_provider.GetNextAsync))
//Makes return items that will only execute after subscription
.Select(item => Observable.Defer(() => {
return _processers.Aggregate(
seed: Observable.Return(item),
func: (current, processor) => current.SelectMany(processor.ProcessAsync));
}))
//Only allow 3 streams to be execute in parallel.
.Merge(3);

分解它的作用,

  1. While将检查每次迭代,如果 _provider.HasNext是真的,如果是这样,那么它将重新订阅以获得下一个值 _provider , 否则它会发出 onCompleted
  2. 在 select 内部创建了一个新的可观察流,但尚未使用 Defer 进行评估
  3. 返回的IObservable<IObservable<T>>传递给 Merge最多同时订阅 3 个可观察对象。
  4. 内部 observable 最终在它被订阅时进行评估。

备选方案 1

如果您还需要控制并行请求的数量,您需要更巧妙一些,因为您需要发出信号表明您的 Observable已准备好接受新值:

return Observable.Create<T>(observer => 
{
var subject = new Subject<Unit>();
var disposable = new CompositeDisposable(subject);

disposable.Add(subject
//This will complete when provider has run out of values
.TakeWhile(_ => _provider.HasNext)
.SelectMany(
_ => _provider.GetNextAsync(),
(_, item) =>
{
return _processors
.Aggregate(
seed: Observable.Return(item),
func: (current, processor) => current.SelectMany(processor.ProcessAsync))
//Could also use `Finally` here, this signals the chain
//to start on the next item.
.Do(dontCare => {}, () => subject.OnNext(Unit.Default));
}
)
.Merge(3)
.Subscribe(observer));

//Queue up 3 requests for the initial kickoff
disposable.Add(Observable.Repeat(Unit.Default, 3).Subscribe(subject.OnNext));

return disposable;
});

关于c# - 响应式(Reactive)管道——如何控制并行度?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30955097/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com