gpt4 book ai didi

python - 如何使用 Python 响应式扩展 (RxPY) 创建滴答时间序列?

转载 作者:太空宇宙 更新时间:2023-11-03 17:08:12 25 4
gpt4 key购买 nike

我的设置:

我有一个(时间,价格)元组中的股票价格数据列表:

from datetime import datetime
prices = [(datetime(2015, 1, 9), 101.9), (datetime(2015, 1, 12), 101.5), (datetime(2015, 1, 13), 101.7)]

我想把它变成 RxPY Observable这样我就可以逐笔回测交易策略:

def myStrategy(date, price):   # SELL if date is a Monday and price is above 101.6
strategy = 'SELL' if date.weekday() and price > 101.6 else 'BUY'
print 'date=%s price=%s strategy=%s' % (date, price, strategy)

我希望从 2015 年 1 月 12 日开始回溯测试,因此我假设我必须使用以下调度程序:

from rx.concurrency import HistoricalScheduler
scheduler = HistoricalScheduler(datetime(2015, 1, 12))

为了运行我的回测,我这样做:

from rx import Observable
observable = Observable.from_iterable(prices, scheduler=scheduler).timestamp()
observable.subscribe(lambda price: myStrategy(price.timestamp, price.value))
scheduler.start()

问题:

我希望看到:

date=2015-01-12 00:00:00 price=101.5 strategy=BUY
date=2015-01-13 00:00:00 price=101.7 strategy=SELL

但是我得到了

date=2015-12-20 08:43:45.882000 price=(datetime.datetime(2015, 1, 9, 0, 0), 101.9) strategy=SELL
date=2015-12-20 08:43:45.882000 price=(datetime.datetime(2015, 1, 12, 0, 0), 101.5) strategy=SELL
date=2015-12-20 08:43:45.882000 price=(datetime.datetime(2015, 1, 13, 0, 0), 101.7) strategy=SELL

问题在于:

  • 时间戳错误:我得到今天的日期 2015-12-20 08:43:45.882000而不是历史日期(例如 datetime(2015, 1, 12) )
  • 价格仍包含时间成分
  • 调度程序没有按照我的要求在 2015 年 1 月 12 日启动,因为我发现 2015 年 1 月 9 日的数据点仍在使用。

我还尝试使用 scheduler.now() :

observable.subscribe(lambda price: myStrategy(scheduler.now(), price.value))

但是日期被卡在 date=2015-01-12 00:00:00由于某种原因:

date=2015-01-12 00:00:00 price=(datetime.datetime(2015, 1, 9, 0, 0), 101.9) strategy=BUY
date=2015-01-12 00:00:00 price=(datetime.datetime(2015, 1, 12, 0, 0), 101.5) strategy=BUY
date=2015-01-12 00:00:00 price=(datetime.datetime(2015, 1, 13, 0, 0), 101.7) strategy=BUY

如何解决上述问题并获得我最初预期的结果?

最佳答案

对 rx 来说也很陌生,

  • 我认为时间戳()需要调度程序参数。否则它根据 rx 文档,在某些默认调度程序上工作。
  • 您将整个元组(日期、价格)作为价格传递给myStrategy() 这就是它打印日期的原因。

“时间戳默认在超时调度程序上运行,但也有一个变体,允许您通过将其作为参数传递来指定调度程序。” http://reactivex.io/documentation/operators/timestamp.html

只有 rxjs 的文档,但 rx 的美妙之处在于一切都遵循。

请看看这是否适合您。

    observable = Observable.from_iterable(prices,scheduler=scheduler).timestamp(scheduler=scheduler)
observable.subscribe(lambda price: myStrategy(price.timestamp, price.value[1]))
scheduler.start()

关于python - 如何使用 Python 响应式扩展 (RxPY) 创建滴答时间序列?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34379360/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com