gpt4 book ai didi

c# - 如何使用 HistoricalScheduler 将 IEnumerable 转换为 IObservable

转载 作者:太空狗 更新时间:2023-10-29 22:57:15 25 4
gpt4 key购买 nike

我有一个 IEnumerable<T>其中 T 允许推导相关时间戳。

我想将其转换为 IObservable<T>但我想使用 HistoricalScheduler这样通知就会根据派生的时间戳发生。这样做允许使用内置的 RX 方法进行窗口化、滑动窗口等,这正是我最终要尝试使用的。

许多关于如何解决这个问题的建议建议使用 Generate() .然而,这method causes StackOverflowExceptions .例如:

    static void Main(string[] args)
{

var enumerable = Enumerable.Range(0, 2000000);
var now = DateTimeOffset.Now;
var scheduler = new HistoricalScheduler(now);
var observable = Observable.Generate(
enumerable.GetEnumerator(),
e => e.MoveNext(),
e => e,
e => Timestamped.Create(e, now.AddTicks(e.Current)),
e => now.AddTicks(e.Current),
scheduler);
var s2 = observable.Count().Subscribe(eventCount => Console.WriteLine("Found {0} events @ {1}", eventCount, scheduler.Now));
scheduler.Start();
s2.Dispose();
Console.ReadLine();
}

这将导致堆栈溢出。

标准ToObservable()方法不能使用,因为虽然它允许指定自定义调度程序,但它不提供任何机制来控制结果通知如何在该调度程序上调度。

如何转换 IEnumerableIObservable有明确安排的通知?

更新

尝试在以下测试中使用 Asti 的代码:

    static void Main(string[] args)
{
var enumerable = Enumerable.Range(0, 2000000);
var now = DateTimeOffset.Now;
var series = enumerable.Select(i => Timestamped.Create(i, now.AddSeconds(i)));
var ticks = Observable.Interval(TimeSpan.FromSeconds(1)).Select(i => now.AddSeconds(i));
var scheduler = new HistoricalScheduler(now);
Playback(series,ticks,scheduler).Subscribe(Console.WriteLine);
scheduler.Start();
}

但是它会抛出一个 ArgumentOutOfRangeException :

Specified argument was out of the range of valid values.
Parameter name: time

at System.Reactive.Concurrency.VirtualTimeSchedulerBase`2.AdvanceTo(TAbsolute time)
at System.Reactive.AnonymousSafeObserver`1.OnNext(T value)
at System.Reactive.Linq.ObservableImpl.Select`2._.OnNext(TSource value)
at System.Reactive.Linq.ObservableImpl.Timer.TimerImpl.Tick(Int64 count)
at System.Reactive.Concurrency.DefaultScheduler.<>c__DisplayClass7_0`1.<SchedulePeriodic>b__1()
at System.Reactive.Concurrency.AsyncLock.Wait(Action action)
at System.Reactive.Concurrency.DefaultScheduler.<>c__DisplayClass7_0`1.<SchedulePeriodic>b__0()
at System.Reactive.Concurrency.ConcurrencyAbstractionLayerImpl.PeriodicTimer.Tick(Object state)
at System.Threading.TimerQueueTimer.CallCallbackInContext(Object state)
at System.Threading.ExecutionContext.RunInternal(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean preserveSyncCtx)
at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean preserveSyncCtx)
at System.Threading.TimerQueueTimer.CallCallback()
at System.Threading.TimerQueueTimer.Fire()
at System.Threading.TimerQueue.FireNextTimers()
at System.Threading.TimerQueue.AppDomainTimerCallback()

最佳答案

我们制作了一个运算符,它根据指定的可观察值随着时间的推移在历史调度程序上重播有序的事件序列。

    public static IObservable<T> Playback<T>(
this IEnumerable<Timestamped<T>> enumerable,
IObservable<DateTimeOffset> ticks,
HistoricalScheduler scheduler = default(HistoricalScheduler)
)
{
return Observable.Create<T>(observer =>
{
scheduler = scheduler ?? new HistoricalScheduler();

//create enumerator of sequence - we're going to iterate through it manually
var enumerator = enumerable.GetEnumerator();

//set scheduler time for every incoming value of ticks
var timeD = ticks.Subscribe(scheduler.AdvanceTo);

//declare an iterator
Action scheduleNext = default(Action);
scheduleNext = () =>
{
//move
if (!enumerator.MoveNext())
{
//no more items
//sequence has completed
observer.OnCompleted();
return;
}

//current item of enumerable sequence
var current = enumerator.Current;

//schedule the item to run at the timestamp specified
scheduler.ScheduleAbsolute(current.Timestamp, () =>
{
//push the value forward
observer.OnNext(current.Value);

//schedule the next item
scheduleNext();
});
};

//start the process by scheduling the first item
scheduleNext();

//dispose the enumerator and subscription to ticks
return new CompositeDisposable(timeD, enumerator);
});
}

移植你之前的例子,

   var enumerable = Enumerable.Range(0, 20000000);
var now = DateTimeOffset.Now;
var series = enumerable.Select(i => Timestamped.Create(i, now.AddSeconds(i)));
var ticks = Observable.Interval(TimeSpan.FromSeconds(1)).Select(i => now.AddSeconds(i));

series.Playback(ticks).Subscribe(Console.WriteLine);

我们通读可枚举以保持惰性,并使用简单的 Interval observable 设置时钟。减少间隔会使其播放速度更快。

关于c# - 如何使用 HistoricalScheduler 将 IEnumerable 转换为 IObservable,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41072709/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com