gpt4 book ai didi

c# - .Net 的 Rx : how to combine Scan with Throttle

转载 作者:行者123 更新时间:2023-11-30 20:24:27 25 4
gpt4 key购买 nike

我的问题是:对于给定的事件序列,我想缓存它们的值,直到流中出现暂停。然后,我将批量处理所有缓存数据并清除缓存状态。

一个天真的方法是(不是工作代码,可能存在一些错误):

struct FlaggedData
{
public EventData Data { get; set; }
public bool Reset { get; set; }
}

...

IObservable<EventData> eventsStream = GetStream();
var resetSignal = new Subject<FlaggedData>();

var flaggedDataStream = eventsStream
.Select(data => new FlaggedData { Data = data })
.Merge(resetSignal)
.Scan(
new List<EventData>(),
(cache, flaggedData) =>
{
if (!flaggedData.Reset())
{
cache.Add(flaggedData.Data);
return cache;
}

return new List<EventData>();
})
.Throttle(SomePeriodOfTime)
.Subscribe(batch =>
{
resetSignal.OnNext(new FlaggedData { Reset = true});
ProcessBatch(batch);
});

所以在这里,在收到任何要处理的批处理后,我请求重置缓存。问题是,由于 Throttle,缓存中可能会有一些数据(或者我相信),在这种情况下这些数据会丢失。

我想要的是一些像这样的操作:

ScanWithThrottling<TAccumulate, TSource>(
IObservable<TSource> source,
Func<TAccumulate, TSource, TAccumulate> aggregate,
TimeSpan throttlingSpan)

它返回一个 observable,每次调用其订阅者的 OnNext 时都会重置累积值。

当然,我可以编写自己的扩展,但问题是是否有某种方法可以使用标准 Rx 操作实现相同的效果。

最佳答案

我认为这里有一个简单的方法。使用 Buffer()像这样基于 throttle 缓冲元素:

var buffered = source.Publish(ps =>        
ps.Buffer(() => ps.Throttle(SomePeriodOfTime)));

这将缓冲元素,直到存在 SomePeriodOfTime 的间隙并将它们呈现为列表。无需担心“重置”方面,您不会丢失元素。

Publish的使用确保有一个单一的共享订阅源事件可以被 Buffer 使用每个 Throttle . throttle 是缓冲区关闭函数,提供一个指示新缓冲区应该启动的信号。

这是一个可测试的版本 - 我只是在这里转储每个缓冲区的长度并使用 Timestamp添加时间信息,但它是 IList<T>你得到原始缓冲流。请注意调度程序是如何作为参数提供给基于时间的操作以启用测试的。

请注意,您将需要 nuget 包 rx-testing 来运行此示例,以引入 Rx 测试框架并获得 TestSchedulerReactiveTest类型:

void Main()
{
var scenarios = new Scenarios();
scenarios.Scenario1();
}

public class Scenarios : ReactiveTest
{
public void Scenario1()
{
var scheduler = new TestScheduler();
var source = scheduler.CreateHotObservable(
OnNext(100, 1),
OnNext(200, 2),
OnNext(300, 3),
OnNext(800, 4),
OnNext(900, 5),
OnNext(1400, 6),
OnNext(1600, 7),
OnNext(1700, 8),
OnNext(1800, 9));

var duration = TimeSpan.FromTicks(300);

var buffered = source.Publish(ps =>
ps.Buffer(() => ps.Throttle(duration, scheduler)));

buffered.Timestamp(scheduler).Subscribe(
x => Console.WriteLine("Timestamp: {0} Value: {1}",
x.Timestamp.Ticks, x.Value.Count()));

scheduler.Start();

}
}

关于c# - .Net 的 Rx : how to combine Scan with Throttle,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26773512/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com