gpt4 book ai didi

c# - 响应式(Reactive)扩展是否支持滚动缓冲区?

转载 作者:IT王子 更新时间:2023-10-29 04:30:36 24 4
gpt4 key购买 nike

我正在使用响应式扩展将数据整理到 100 毫秒的缓冲区中:

this.subscription = this.dataService
.Where(x => !string.Equals("FOO", x.Key.Source))
.Buffer(TimeSpan.FromMilliseconds(100))
.ObserveOn(this.dispatcherService)
.Where(x => x.Count != 0)
.Subscribe(this.OnBufferReceived);

这很好用。但是,我想要的行为与 Buffer 操作提供的行为略有不同。本质上,如果收到另一个数据项,我想重置计时器。只有当整个 100 毫秒都没有收到数据时,我才想处理它。这开启了从不处理数据的可能性,所以我也应该能够指定最大计数。我会想象一些类似的东西:

.SlidingBuffer(TimeSpan.FromMilliseconds(100), 10000)

我环顾四周,没能在 Rx 中找到类似的东西?谁能证实/否认这一点?

最佳答案

这可以通过结合 Observable 的内置 WindowThrottle 方法来实现。 .首先,让我们解决忽略最大计数条件的更简单的问题:

public static IObservable<IList<T>> BufferUntilInactive<T>(this IObservable<T> stream, TimeSpan delay)
{
var closes = stream.Throttle(delay);
return stream.Window(() => closes).SelectMany(window => window.ToList());
}

强大Window method做了繁重的工作。现在很容易看出如何添加最大计数:

public static IObservable<IList<T>> BufferUntilInactive<T>(this IObservable<T> stream, TimeSpan delay, Int32? max=null)
{
var closes = stream.Throttle(delay);
if (max != null)
{
var overflows = stream.Where((x,index) => index+1>=max);
closes = closes.Merge(overflows);
}
return stream.Window(() => closes).SelectMany(window => window.ToList());
}

我会在我的博客上写一篇解释这个的帖子。 https://gist.github.com/2244036

Window 方法的文档:

关于c# - 响应式(Reactive)扩展是否支持滚动缓冲区?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/7597773/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com