gpt4 book ai didi

c# - Rx 中的高级历史流和实时流

转载 作者:IT王子 更新时间:2023-10-29 04:44:28 34 4
gpt4 key购买 nike

我有一个热可观察对象,我通常使用下面的普通 Subject 来实现,这样感兴趣的人就可以订阅实时通知流。

现在我想保留那个直播流,但也公开了所有事件的历史流,这些事件已经发生并且有绝对时间附加到这些通知上,以了解它们发生的确切时间以及允许订阅者在重放时间顺序之前将历史流提前到任何时间点。

  • 我相信大部分可以通过 HistoricalScheduler 来实现及其 AdvanceTo 方法,但我不确定具体方法是什么?
  • 并且正在使用 Timestamped节省所需事件的时间?
  • 并且是一个ReplaySubject需要将实时流缓存到历史记录中,然后可以使用 HistoricalScheduler 回放?

这两个流究竟如何针对同一来源实现,或者换句话说,以下内容如何适应当前要求?

how to save time in .net

[参见"Replaying the past"标题]

最佳答案

HistoricalScheduler 为您提供的是控制调度程序虚拟时间向前运动的能力。

您没有得到的是随着时间的推移随机访问。由于虚拟时间提前,执行预定的 Action ,所以必须提前预定。在过去安排的任何操作 - 即在 HistoricalScheduler.Now 值之后的绝对时间 - 立即执行。

要重播事件,您需要以某种方式记录它们,然后使用 HistoricalScheduler 的实例安排它们 - 然后提前时间。

当您提前时间时,计划的操作会在其到期时间执行 - 当可观察对象向其订阅者发送 OnXXX() 时,调度程序的 Now 属性将具有当前虚拟时间。

每个订阅者都需要访问自己的调度程序,以便独立于其他订阅者控制时间。这实际上意味着为每个订阅者创建一个可观察对象。

这是我编写的一个简单示例(如果您引用 nuget 包 rx-main,它将在 LINQPad 中运行)。

首先,我录制了一个直播流(以完全非生产方式!),将事件记录到一个列表中。正如您所建议的,使用 TimeStamp() 可以很好地捕捉时间:

/* record a live stream */
var source = Observable.Interval(TimeSpan.FromSeconds(1));
var log = source.Take(5).Timestamp().ToList().Wait();


Console.WriteLine("Time now is " + DateTime.Now);

现在我们可以结合使用 HistoricalScheduler 和巧妙地使用 Generate 来安排事件。请注意,这种方法可以防止大量预定事件提前排队——相反,我们一次只安排一个:

var scheduler = new HistoricalScheduler();

/* set up the scheduling of the recording events */
var replay = Observable.Generate(
log.GetEnumerator(),
events => events.MoveNext(),
events => events,
events => events.Current.Value,
events => events.Current.Timestamp,
scheduler);

现在我们订阅的时候可以看到HistoricalSchedulerNow属性有事件的虚拟时间:

replay.Subscribe(
i => Console.WriteLine("Event: {0} happened at {1}", i,
scheduler.Now));

最后我们可以启动时间表(使用 Start() 只是尝试播放所有事件,而不是使用 AdvanceTo 移动到特定时间 - 这就像做 AdvanceTo(DateTime.最大值);

scheduler.Start();

我的输出是:

Time now is 07/01/2014 15:17:27
Event: 0 happened at 07/01/2014 15:17:23 +00:00
Event: 1 happened at 07/01/2014 15:17:24 +00:00
Event: 2 happened at 07/01/2014 15:17:25 +00:00
Event: 3 happened at 07/01/2014 15:17:26 +00:00
Event: 4 happened at 07/01/2014 15:17:27 +00:00

结果是您可能最终不得不通过此工具创建自己的 API 以获得适合您特定目的的内容。它给您留下了相当多的工作量 - 但仍然是非常强大的东西。

很棒的是,实时可观察对象和重放可观察对象看起来确实没有什么不同——只要你记得始终参数化你的调度程序(!)——因此可以轻松地在它们上运行相同的查询,所有时间查询使用调度程序的虚拟时间。

我用它来测试对旧数据的新查询,以在商业场景中产生很好的效果。

它并不想成为传输控件,例如在 GUI 中提供在时间上来回滚动的服务。通常,您以大块的形式运行历史记录,存储新查询的输出,然后使用此数据在 GUI 中显示后续,以便用户可以通过您提供的其他机制随意来回移动。

最后,您不需要ReplaySubject 来缓存直播流;但您确实需要一些记录事件以进行重播的方法 - 这可能只是一个写入日志的观察者。

关于c# - Rx 中的高级历史流和实时流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/20969829/

34 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com