gpt4 book ai didi

C# react 扩展当 OnNext 花费很长时间并且可观察到产生新事件时会发生什么

转载 作者:行者123 更新时间:2023-11-30 22:11:52 24 4
gpt4 key购买 nike

我是 Rx 的新手,我在想当 IObservable 非常快速地产生大量事件而 OnNext 需要很长时间时会发生什么。我想新事件在内部以某种方式排队,所以我可以运行我们的内存。我对吗?考虑以下小示例:

        Subject<int> subject = new Subject<int>();
subject.ObserveOn(Scheduler.ThreadPool).Subscribe(x => { Console.WriteLine("Value published: {0}", x); Thread.Sleep(100); },
() => Console.WriteLine("Sequence Completed."));

for (int i = 0; i < 10; i++)
{
subject.OnNext(i);
}

我发布了 10 个事件,消费者方法非常慢。所有事件都经过处理,因此必须将其缓存在内存中。如果我发布很多事件,我会用完内存,对吗?还是我错过了什么?

有没有办法限制响应式扩展中未决事件的数量?例如,如果有超过 5 个未决事件,我想忽略新事件。

最佳答案

是的,你是对的,缓慢的消费者会导致排队。与您所要求的内容最接近的内置运算符是 Observable.Sample - 但是,这会丢弃较旧的事件以支持较新的事件。这是更常见的要求,因为它可以让你的慢速观察者 catch 来。

让我知道示例是否足够,因为您描述的缓冲行为是一个不寻常的要求 - 它是可以实现的,但实现起来非常复杂并且需要相当重要的代码。

编辑 - 如果您像这样使用 Sample,它将在每次 OnNext 之后返回最新的事件(您需要为此提供一个调度程序 - 并且 NewThreadScheduler 创建一个线程每个订阅,而不是每个事件:

void Main()
{
var source = Observable.Interval(TimeSpan.FromMilliseconds(10)).Take(100);

source.Sample(TimeSpan.Zero, NewThreadScheduler.Default)
.Subscribe(SlowConsumer);

Console.ReadLine();
}

private void SlowConsumer(long item)
{
Console.WriteLine(item + " " + Thread.CurrentThread.ManagedThreadId);
Thread.Sleep(TimeSpan.FromSeconds(1));
}

关于C# react 扩展当 OnNext 花费很长时间并且可观察到产生新事件时会发生什么,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/20018503/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com