gpt4 book ai didi

c# - 如何使用 Reactive Extensions 使用最大窗口大小来限制事件?

转载 作者:可可西里 更新时间:2023-11-01 09:01:10 24 4
gpt4 key购买 nike

场景:

我正在构建一个 UI 应用程序,每隔几毫秒从后端服务获取通知。收到新通知后,我想尽快更新 UI。

因为我可以在短时间内收到很多通知,而且我总是只关心最新的事件,所以我使用了 Reactive Extensions 框架的 Throttle() 方法。这让我可以忽略紧跟新通知的通知事件,因此我的 UI 保持响应。

问题:

假设我将通知事件的事件流限制为 50 毫秒,并且后端每 10 毫秒发送一次通知,则 Thottle() 方法将永远不会返回事件,因为它会一次又一次地重置其滑动窗口。在这里,我需要一些额外的行为来指定诸如超时之类的东西,以便在事件吞吐量如此之高的情况下,我每秒至少可以检索一个事件。我如何使用 Reactive Extensions 做到这一点?

最佳答案

正如 James 所说,Observable.Sample 将为您提供最新产生的值。但是,它将在计时器上执行此操作,而不是根据 throttle 中的第一个事件发生的时间。然而,更重要的是,如果您的采样时间很长(比如十秒),并且您的事件在采样后立即触发,那么您将近十秒不会收到该新事件。

如果您需要更紧凑的东西,您需要实现自己的功能。我冒昧地这样做了。此代码肯定需要一些清理,但我相信它可以满足您的要求。

public static class ObservableEx
{
public static IObservable<T> ThrottleMax<T>(this IObservable<T> source, TimeSpan dueTime, TimeSpan maxTime)
{
return source.ThrottleMax(dueTime, maxTime, Scheduler.Default);
}

public static IObservable<T> ThrottleMax<T>(this IObservable<T> source, TimeSpan dueTime, TimeSpan maxTime, IScheduler scheduler)
{
return Observable.Create<T>(o =>
{
var hasValue = false;
T value = default(T);

var maxTimeDisposable = new SerialDisposable();
var dueTimeDisposable = new SerialDisposable();

Action action = () =>
{
if (hasValue)
{
maxTimeDisposable.Disposable = Disposable.Empty;
dueTimeDisposable.Disposable = Disposable.Empty;
o.OnNext(value);
hasValue = false;
}
};

return source.Subscribe(
x =>
{
if (!hasValue)
{
maxTimeDisposable.Disposable = scheduler.Schedule(maxTime, action);
}

hasValue = true;
value = x;
dueTimeDisposable.Disposable = scheduler.Schedule(dueTime, action);
},
o.OnError,
o.OnCompleted
);
});
}
}

还有一些测试...

[TestClass]
public class ThrottleMaxTests : ReactiveTest
{
[TestMethod]
public void CanThrottle()
{

var scheduler = new TestScheduler();
var results = scheduler.CreateObserver<int>();

var source = scheduler.CreateColdObservable(
OnNext(100, 1)
);

var dueTime = TimeSpan.FromTicks(100);
var maxTime = TimeSpan.FromTicks(250);

source.ThrottleMax(dueTime, maxTime, scheduler)
.Subscribe(results);

scheduler.AdvanceTo(1000);

results.Messages.AssertEqual(
OnNext(200, 1)
);
}

[TestMethod]
public void CanThrottleWithMaximumInterval()
{

var scheduler = new TestScheduler();
var results = scheduler.CreateObserver<int>();

var source = scheduler.CreateColdObservable(
OnNext(100, 1),
OnNext(175, 2),
OnNext(250, 3),
OnNext(325, 4),
OnNext(400, 5)
);

var dueTime = TimeSpan.FromTicks(100);
var maxTime = TimeSpan.FromTicks(250);

source.ThrottleMax(dueTime, maxTime, scheduler)
.Subscribe(results);

scheduler.AdvanceTo(1000);

results.Messages.AssertEqual(
OnNext(350, 4),
OnNext(500, 5)
);
}

[TestMethod]
public void CanThrottleWithoutMaximumIntervalInterferance()
{
var scheduler = new TestScheduler();
var results = scheduler.CreateObserver<int>();

var source = scheduler.CreateColdObservable(
OnNext(100, 1),
OnNext(325, 2)
);

var dueTime = TimeSpan.FromTicks(100);
var maxTime = TimeSpan.FromTicks(250);

source.ThrottleMax(dueTime, maxTime, scheduler)
.Subscribe(results);

scheduler.AdvanceTo(1000);

results.Messages.AssertEqual(
OnNext(200, 1),
OnNext(425, 2)
);
}
}

关于c# - 如何使用 Reactive Extensions 使用最大窗口大小来限制事件?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/20034476/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com