- c - 在位数组中找到第一个零
- linux - Unix 显示有关匹配两种模式之一的文件的信息
- 正则表达式替换多个文件
- linux - 隐藏来自 xtrace 的命令
我有一个 IEnumerable<T>
其中 T 允许推导相关时间戳。
我想将其转换为 IObservable<T>
但我想使用 HistoricalScheduler
这样通知就会根据派生的时间戳发生。这样做允许使用内置的 RX 方法进行窗口化、滑动窗口等,这正是我最终要尝试使用的。
许多关于如何解决这个问题的建议建议使用 Generate()
.然而,这method causes StackOverflowExceptions .例如:
static void Main(string[] args)
{
var enumerable = Enumerable.Range(0, 2000000);
var now = DateTimeOffset.Now;
var scheduler = new HistoricalScheduler(now);
var observable = Observable.Generate(
enumerable.GetEnumerator(),
e => e.MoveNext(),
e => e,
e => Timestamped.Create(e, now.AddTicks(e.Current)),
e => now.AddTicks(e.Current),
scheduler);
var s2 = observable.Count().Subscribe(eventCount => Console.WriteLine("Found {0} events @ {1}", eventCount, scheduler.Now));
scheduler.Start();
s2.Dispose();
Console.ReadLine();
}
这将导致堆栈溢出。
标准ToObservable()
方法不能使用,因为虽然它允许指定自定义调度程序,但它不提供任何机制来控制结果通知如何在该调度程序上调度。
如何转换 IEnumerable
到IObservable
有明确安排的通知?
尝试在以下测试中使用 Asti 的代码:
static void Main(string[] args)
{
var enumerable = Enumerable.Range(0, 2000000);
var now = DateTimeOffset.Now;
var series = enumerable.Select(i => Timestamped.Create(i, now.AddSeconds(i)));
var ticks = Observable.Interval(TimeSpan.FromSeconds(1)).Select(i => now.AddSeconds(i));
var scheduler = new HistoricalScheduler(now);
Playback(series,ticks,scheduler).Subscribe(Console.WriteLine);
scheduler.Start();
}
但是它会抛出一个 ArgumentOutOfRangeException
:
Specified argument was out of the range of valid values.
Parameter name: time
at System.Reactive.Concurrency.VirtualTimeSchedulerBase`2.AdvanceTo(TAbsolute time)
at System.Reactive.AnonymousSafeObserver`1.OnNext(T value)
at System.Reactive.Linq.ObservableImpl.Select`2._.OnNext(TSource value)
at System.Reactive.Linq.ObservableImpl.Timer.TimerImpl.Tick(Int64 count)
at System.Reactive.Concurrency.DefaultScheduler.<>c__DisplayClass7_0`1.<SchedulePeriodic>b__1()
at System.Reactive.Concurrency.AsyncLock.Wait(Action action)
at System.Reactive.Concurrency.DefaultScheduler.<>c__DisplayClass7_0`1.<SchedulePeriodic>b__0()
at System.Reactive.Concurrency.ConcurrencyAbstractionLayerImpl.PeriodicTimer.Tick(Object state)
at System.Threading.TimerQueueTimer.CallCallbackInContext(Object state)
at System.Threading.ExecutionContext.RunInternal(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean preserveSyncCtx)
at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean preserveSyncCtx)
at System.Threading.TimerQueueTimer.CallCallback()
at System.Threading.TimerQueueTimer.Fire()
at System.Threading.TimerQueue.FireNextTimers()
at System.Threading.TimerQueue.AppDomainTimerCallback()
最佳答案
我们制作了一个运算符,它根据指定的可观察值随着时间的推移在历史调度程序上重播有序的事件序列。
public static IObservable<T> Playback<T>(
this IEnumerable<Timestamped<T>> enumerable,
IObservable<DateTimeOffset> ticks,
HistoricalScheduler scheduler = default(HistoricalScheduler)
)
{
return Observable.Create<T>(observer =>
{
scheduler = scheduler ?? new HistoricalScheduler();
//create enumerator of sequence - we're going to iterate through it manually
var enumerator = enumerable.GetEnumerator();
//set scheduler time for every incoming value of ticks
var timeD = ticks.Subscribe(scheduler.AdvanceTo);
//declare an iterator
Action scheduleNext = default(Action);
scheduleNext = () =>
{
//move
if (!enumerator.MoveNext())
{
//no more items
//sequence has completed
observer.OnCompleted();
return;
}
//current item of enumerable sequence
var current = enumerator.Current;
//schedule the item to run at the timestamp specified
scheduler.ScheduleAbsolute(current.Timestamp, () =>
{
//push the value forward
observer.OnNext(current.Value);
//schedule the next item
scheduleNext();
});
};
//start the process by scheduling the first item
scheduleNext();
//dispose the enumerator and subscription to ticks
return new CompositeDisposable(timeD, enumerator);
});
}
移植你之前的例子,
var enumerable = Enumerable.Range(0, 20000000);
var now = DateTimeOffset.Now;
var series = enumerable.Select(i => Timestamped.Create(i, now.AddSeconds(i)));
var ticks = Observable.Interval(TimeSpan.FromSeconds(1)).Select(i => now.AddSeconds(i));
series.Playback(ticks).Subscribe(Console.WriteLine);
我们通读可枚举以保持惰性,并使用简单的 Interval
observable 设置时钟。减少间隔会使其播放速度更快。
关于c# - 如何使用 HistoricalScheduler 将 IEnumerable 转换为 IObservable,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41072709/
假设我有一个返回 IObservable 的函数并且这个函数需要初始状态。 let myObservable (initialState: T) :IObservable = (...) 但我只能从另
假设我有一个返回 IObservable 的函数并且这个函数需要初始状态。 let myObservable (initialState: T) :IObservable = (...) 但我只能从另
我有一个鼠标左键状态流: var leftMouseButton = mouse.Select(x => x.LeftButton).DistinctUntilChanged(); 然后我Window
我有一个“值(value)观”IObservable这是返回 T必须按顺序组合成可变长度数组的元素,我有一个“控制”IObservable这告诉我下一个数组必须有多长。删除一个元素、重复它或将结果打乱
微软推出了 IObservable interface到 BCL 与 .NET Framework 4,我想,“太棒了,终于,我必须使用它!”因此,我深入挖掘并阅读帖子和文档,甚至实现了该模式。 这样
我有一个 IObservable 类型的流(热可观察)并想将其分成 IObservable 的两个可观察对象和 IObservable 我天真地尝试了以下但我只得到 flowStream人口稠密。 I
在 Windows Phone 7 上,IObservable 有一个新版本的 BufferWithTimeOrCount 扩展方法,它返回“流的流”而不是以前的“列表流”。我在尝试使用新方法或旧方法
更新:查看底部的示例 我需要在课间发消息。发布者将无限循环,调用一些方法来获取数据,然后将该调用的结果传递给 OnNext .可以有很多订阅者,但应该只有一个 IObservable 和一个长期运行的
我想创建一个可用于表示动态计算值的类,而另一个表示值的类可以是这些动态计算值的源(主题)。目标是当主题发生变化时,计算值会自动更新。 在我看来,使用 IObservable/IObserver 是可行
我有一个返回接口(interface) IObservable 的方法(在 silverlight 中)并希望将其转换为另一个 IObservable ? 那么我需要用什么来代替“CONVERT_SO
这两种方法有什么区别,每种方法的最佳情况是什么?我知道他们都能够附加一个函数来处理来自 IObservable 的排放,但我并不真正理解除此之外的差异。 编辑 对不起,我应该指定的。 IObserva
我的系统有很多状态对象——连接状态、CPU 负载、登录用户等等。所有此类事件都合并到单个可观察流中。 我想制作一个管理实用程序来显示系统的实际状态并显示所有这些计数器。 我如何创建一个包含所有计数器的
我有一个 IObservable我把它变成一个IObservable使用一些中间步骤: var observedXDocuments = from b in observedBytes
Although it is possible to attach an observer to multiple providers, the recommended pattern is to a
在我当前正在开发的系统中,我有许多被定义为接口(interface)和基类的组件。系统的每个部分都有一些与系统其他部分交互的特定点。 例如,数据准备组件准备了一些数据,最终需要进入数据处理部分,通信组
我有一个 IObservable包含 XML 文档(的片段)。我想把一个变成另一个。因此,例如,假设我有以下从我的 IObservable 推送的片段(每行包含一个片段): 获取以下文件: 我一
我有一个 IObservable这给了我字节数组中不确定数量的字节。我想知道我是如何从那开始返回 IObservable 的每个字节数组中有一定数量的字节。假设我们一次需要 10 个字节。 也就是说,
我在我的一个项目中使用了 IObserver/IObservable 接口(interface)。 CommandReader 是一个 IObservable,它不断从流中读取数据,然后将其传递给它的
例如,考虑一下: public IDisposable Subscribe(IObserver observer) { return eventStream.Where
当我写 .Subscribe 时我经常发现 Resharper 为我选择了以下重载,位于 mscorlib 中,Version=4.0.0.0: namespace System { public
我是一名优秀的程序员,十分优秀!