gpt4 book ai didi

system.reactive - 使用 Rx 合并历史和实时股票价格数据

转载 作者:行者123 更新时间:2023-12-04 02:25:18 25 4
gpt4 key购买 nike

我正在尝试 Rx,因为它似乎很适合我们的领域,但学习曲线让我感到惊讶。

我需要将历史价格数据与实时价格数据结合在一起。

我正在尝试将执行此操作的常用方法调整为 Rx 语言:

  • 立即订阅实时价格并开始缓冲我返回的值
  • 发起对历史价格数据的请求(这需要在订阅实时价格之后发生,因此我们的数据中没有任何差距)
  • 发布历史价格回来
  • 收到所有历史数据后,发布缓冲的实时数据,删除与开头历史数据重叠的所有值
  • 继续从实时价格源中重放数据

  • 我有这个令人作呕且不正确的稻草人代码,它似乎适用于我编写的幼稚测试用例:
    IConnectableObservable<Tick> live = liveService
    .For(symbol)
    .Replay(/* Some appropriate buffer size */);
    live.Connect();

    IObservable<Tick> historical = historyService.For(since, symbol);

    return new[] {historical, live}
    .Concat()
    .Where(TicksAreInChronologicalOrder());

    private static Func1<Tick,bool> TicksAreInChronologicalOrder()
    {
    // Some stateful predicate comparing the timestamp of this tick
    // to the timestamp of the last tick we saw
    }

    这有一些缺点
  • 不知道适当的重放缓冲区大小。设置无限缓冲区是不可能的 - 这是一个长时间运行的序列。我们真的想要某种一次性缓冲区,在第一次调用 Subscribe 时刷新。如果这在 Rx 中存在,我找不到它。
  • 即使我们切换到发布实时价格,重播缓冲区也将继续存在。我们此时不需要缓冲区。
  • 类似地,一旦我们跳过了历史价格和实时价格之间的初始重叠,就不需要过滤重叠报价的谓词。我真的很想做这样的事情:live.SkipWhile(tick => tick.Timestamp < /* lazily get last timestamp in historical data */) .是 Wait(this IObservable<TSource>)这里有用吗?

  • 必须有更好的方法来做到这一点,但我仍在等待我的大脑像 FP 一样理解 Rx。

    我考虑过解决 1. 的另一个选择是编写我自己的 Rx 扩展,它是 ISubject将消息排队直到它获得第一个订阅者(然后拒绝订阅者?)。也许这就是要走的路?

    最佳答案

    如果您的历史数据和实时数据都是基于时间或调度程序的,也就是说,随着时间的推移,事件流看起来像这样:

    |---------------------------------------------------->  time
    h h h h h h historical
    l l l l l l live

    您可以使用一个简单的 TakeUntil构造:
    var historicalStream = <fetch historical data>;
    var liveStream = <fetch live data>;

    var mergedWithoutOverlap =
    // pull from historical
    historicalStream
    // until we start overlapping with live
    .TakeUntil(liveStream)
    // then continue with live data
    .Concat(liveStream);

    如果您一次性获取所有历史数据,例如 IEnumerable<T> ,您可以使用 StartWith 的组合和你的其他逻辑:
    var historicalData = <get IEnumerable of tick data>;
    var liveData = <get IObservable of tick data>;

    var mergedWithOverlap =
    // the observable is the "long running" feed
    liveData
    // But we'll inject the historical data in front of it
    .StartWith(historicalData)
    // Perform filtering based on your needs
    .Where( .... );

    关于system.reactive - 使用 Rx 合并历史和实时股票价格数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/14813963/

    25 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com