的序列就像这样: 当第-6ren">
gpt4 book ai didi

c# - Reactive 的 "Buffer until quiet"行为?

转载 作者:行者123 更新时间:2023-11-30 13:18:59 24 4
gpt4 key购买 nike

我的问题有点像 Nagle algorithm是为了解决问题而创建的,但不完全是。我想要的是缓冲 OnNext来自 IObservable<T> 的通知进入 IObservable<IList<T>> 的序列就像这样:

  1. 当第一个T通知到达,将其添加到缓冲区并开始倒计时
  2. 如果另一个T通知在倒计时到期前到达,将其添加到缓冲区并重新开始倒计时
  3. 一旦倒计时到期(即生产者已经沉默了一段时间),转发所有缓冲的 T作为单个聚合的通知 IList<T>通知。
  4. 如果在倒计时到期之前缓冲区大小超过某个最大值,则无论如何都要发送它。

IObservable<IList<T>> Buffer(this IObservable<T>, Timespan, int, IScheduler)看起来很有希望,但它似乎会定期发送聚合通知,而不是执行我想要的“在第一个通知到达时启动计时器并在其他通知到达时重新启动它”的行为,并且它还会在以下位置发送一个空列表如果没有从下面产生通知,则每个时间窗口结束。

不想删除任何T通知;只是缓冲它们。

有这样的东西吗,还是我需要自己写?

最佳答案

SO 上存在一些类似的问题,但不完全是这样。这是一个可以解决问题的扩展方法。

public static IObservable<IList<TSource>> BufferWithThrottle<TSource>
(this IObservable<TSource> source,
int maxAmount, TimeSpan threshold)
{
return Observable.Create<IList<TSource>>((obs) =>
{
return source.GroupByUntil(_ => true,
g => g.Throttle(threshold).Select(_ => Unit.Default)
.Merge( g.Buffer(maxAmount).Select(_ => Unit.Default)))
.SelectMany(i => i.ToList())
.Subscribe(obs);
});
}

关于c# - Reactive 的 "Buffer until quiet"行为?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35557411/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com