如何使用 Python (RxPY) 的 Reactive Extensions 创建滴答作响的时间序列?
How to I create a ticking time series with Reactive Extensions for Python (RxPY)?
我的设置:
我有一个(时间,价格)元组的股票价格数据列表:
from datetime import datetime
prices = [(datetime(2015, 1, 9), 101.9), (datetime(2015, 1, 12), 101.5), (datetime(2015, 1, 13), 101.7)]
我想把它变成 RxPY Observable
这样我就可以逐笔回测交易策略:
def myStrategy(date, price): # SELL if date is a Monday and price is above 101.6
strategy = 'SELL' if date.weekday() and price > 101.6 else 'BUY'
print 'date=%s price=%s strategy=%s' % (date, price, strategy)
我希望从 2015 年 1 月 12 日开始回测,所以我假设我必须使用以下调度程序:
from rx.concurrency import HistoricalScheduler
scheduler = HistoricalScheduler(datetime(2015, 1, 12))
为了 运行 我做的回测:
from rx import Observable
observable = Observable.from_iterable(prices, scheduler=scheduler).timestamp()
observable.subscribe(lambda price: myStrategy(price.timestamp, price.value))
scheduler.start()
问题:
我希望看到:
date=2015-01-12 00:00:00 price=101.5 strategy=BUY
date=2015-01-13 00:00:00 price=101.7 strategy=SELL
但是我得到了
date=2015-12-20 08:43:45.882000 price=(datetime.datetime(2015, 1, 9, 0, 0), 101.9) strategy=SELL
date=2015-12-20 08:43:45.882000 price=(datetime.datetime(2015, 1, 12, 0, 0), 101.5) strategy=SELL
date=2015-12-20 08:43:45.882000 price=(datetime.datetime(2015, 1, 13, 0, 0), 101.7) strategy=SELL
问题是:
- 时间戳是错误的:我得到今天的日期
2015-12-20 08:43:45.882000
而不是历史日期(例如 datetime(2015, 1, 12)
)
- 价格仍然包含时间成分
- 调度程序没有按照我的要求在 2015 年 1 月 12 日启动,因为我看到 2015 年 1 月 9 日的数据点仍在使用。
我也试过使用 scheduler.now()
:
observable.subscribe(lambda price: myStrategy(scheduler.now(), price.value))
但由于某种原因,日期停留在 date=2015-01-12 00:00:00
:
date=2015-01-12 00:00:00 price=(datetime.datetime(2015, 1, 9, 0, 0), 101.9) strategy=BUY
date=2015-01-12 00:00:00 price=(datetime.datetime(2015, 1, 12, 0, 0), 101.5) strategy=BUY
date=2015-01-12 00:00:00 price=(datetime.datetime(2015, 1, 13, 0, 0), 101.7) strategy=BUY
如何解决上述问题并获得我最初预期的结果?
对 rx 也很陌生,
- 我认为 timestamp() 需要 scheduler 参数。否则它
根据 rx 文档在某些默认调度程序上工作。
- 您正在将整个元组 (date,price) 作为价格传递给
myStrategy() 这就是它打印日期的原因。
"timestamp by default operates on the timeout Scheduler, but also has a variant that allows you to specify the Scheduler by passing it in as a parameter."
http://reactivex.io/documentation/operators/timestamp.html
只有 rxjs 的文档,但 rx 的美妙之处在于一切都遵循。
请看看这是否适合你。
observable = Observable.from_iterable(prices,scheduler=scheduler).timestamp(scheduler=scheduler)
observable.subscribe(lambda price: myStrategy(price.timestamp, price.value[1]))
scheduler.start()
我的设置:
我有一个(时间,价格)元组的股票价格数据列表:
from datetime import datetime
prices = [(datetime(2015, 1, 9), 101.9), (datetime(2015, 1, 12), 101.5), (datetime(2015, 1, 13), 101.7)]
我想把它变成 RxPY Observable
这样我就可以逐笔回测交易策略:
def myStrategy(date, price): # SELL if date is a Monday and price is above 101.6
strategy = 'SELL' if date.weekday() and price > 101.6 else 'BUY'
print 'date=%s price=%s strategy=%s' % (date, price, strategy)
我希望从 2015 年 1 月 12 日开始回测,所以我假设我必须使用以下调度程序:
from rx.concurrency import HistoricalScheduler
scheduler = HistoricalScheduler(datetime(2015, 1, 12))
为了 运行 我做的回测:
from rx import Observable
observable = Observable.from_iterable(prices, scheduler=scheduler).timestamp()
observable.subscribe(lambda price: myStrategy(price.timestamp, price.value))
scheduler.start()
问题:
我希望看到:
date=2015-01-12 00:00:00 price=101.5 strategy=BUY
date=2015-01-13 00:00:00 price=101.7 strategy=SELL
但是我得到了
date=2015-12-20 08:43:45.882000 price=(datetime.datetime(2015, 1, 9, 0, 0), 101.9) strategy=SELL
date=2015-12-20 08:43:45.882000 price=(datetime.datetime(2015, 1, 12, 0, 0), 101.5) strategy=SELL
date=2015-12-20 08:43:45.882000 price=(datetime.datetime(2015, 1, 13, 0, 0), 101.7) strategy=SELL
问题是:
- 时间戳是错误的:我得到今天的日期
2015-12-20 08:43:45.882000
而不是历史日期(例如datetime(2015, 1, 12)
) - 价格仍然包含时间成分
- 调度程序没有按照我的要求在 2015 年 1 月 12 日启动,因为我看到 2015 年 1 月 9 日的数据点仍在使用。
我也试过使用 scheduler.now()
:
observable.subscribe(lambda price: myStrategy(scheduler.now(), price.value))
但由于某种原因,日期停留在 date=2015-01-12 00:00:00
:
date=2015-01-12 00:00:00 price=(datetime.datetime(2015, 1, 9, 0, 0), 101.9) strategy=BUY
date=2015-01-12 00:00:00 price=(datetime.datetime(2015, 1, 12, 0, 0), 101.5) strategy=BUY
date=2015-01-12 00:00:00 price=(datetime.datetime(2015, 1, 13, 0, 0), 101.7) strategy=BUY
如何解决上述问题并获得我最初预期的结果?
对 rx 也很陌生,
- 我认为 timestamp() 需要 scheduler 参数。否则它 根据 rx 文档在某些默认调度程序上工作。
- 您正在将整个元组 (date,price) 作为价格传递给 myStrategy() 这就是它打印日期的原因。
"timestamp by default operates on the timeout Scheduler, but also has a variant that allows you to specify the Scheduler by passing it in as a parameter." http://reactivex.io/documentation/operators/timestamp.html
只有 rxjs 的文档,但 rx 的美妙之处在于一切都遵循。
请看看这是否适合你。
observable = Observable.from_iterable(prices,scheduler=scheduler).timestamp(scheduler=scheduler)
observable.subscribe(lambda price: myStrategy(price.timestamp, price.value[1]))
scheduler.start()