有没有办法使用 zipline 在本地创建管道?
Is there a way to create a pipeline locally using zipline?
我已经在 PyCharm 上本地设置了 zipline。模拟工作,此外,我可以访问来自 quandl 的高级数据(当我输入我的 API 密钥时自动更新)。但是,现在我的问题是,如何使用 zipline 在本地创建管道。
Zipline 的文档具有挑战性。 Zipline.io(截至 2021-0405)也下降了。幸运的是,Blueshift 有文档和示例代码,展示了如何制作可以在本地 运行 的管道:
- Blueshift 示例管道 code is here. (Pipelines library here。)
- 可以从 MLTrading (archive documentation here) 访问 Zipline 文档,因为尽管具有挑战性,但它仍然很有用。
- 来自 Blueshift 的管道示例代码的完整代码,但通过 PyCharm 在本地修改为 运行,位于该行下方。请注意,我相信您已经知道,该策略是一个糟糕的策略,您不应该使用它进行交易。不过,它确实显示了管道的本地实例化。
"""
Title: Classic (Pedersen) time-series momentum (equal weights)
Description: This strategy uses past returns and go long (short)
the positive (negative) n-percentile
Style tags: Momentum
Asset class: Equities, Futures, ETFs, Currencies
Dataset: All
"""
"""
Sources:
Overall Algorithm here:
https://github.com/QuantInsti/blueshift-demo-strategies/blob/master/factors/time_series_momentum.py
Custom (Ave Vol Filter, Period Returns) Functions Here:
https://github.com/QuantInsti/blueshift-demo-strategies/blob/master/library/pipelines/pipelines.py
"""
import numpy as np
from zipline.pipeline import CustomFilter, CustomFactor, Pipeline
from zipline.pipeline.data import EquityPricing
from zipline.api import (
order_target_percent,
schedule_function,
date_rules,
time_rules,
attach_pipeline,
pipeline_output,
)
def average_volume_filter(lookback, amount):
"""
Returns a custom filter object for volume-based filtering.
Args:
lookback (int): lookback window size
amount (int): amount to filter (high-pass)
Returns:
A custom filter object
Examples::
# from library.pipelines.pipelines import average_volume_filter
pipe = Pipeline()
volume_filter = average_volume_filter(200, 1000000)
pipe.set_screen(volume_filter)
"""
class AvgDailyDollarVolumeTraded(CustomFilter):
inputs = [EquityPricing.close, EquityPricing.volume]
def compute(self, today, assets, out, close_price, volume):
dollar_volume = np.mean(close_price * volume, axis=0)
high_volume = dollar_volume > amount
out[:] = high_volume
return AvgDailyDollarVolumeTraded(window_length=lookback)
def period_returns(lookback):
"""
Returns a custom factor object for computing simple returns over
period.
Args:
lookback (int): lookback window size
Returns:
A custom factor object.
Examples::
# from library.pipelines.pipelines import period_returns
pipe = Pipeline()
momentum = period_returns(200)
pipe.add(momentum,'momentum')
"""
class SignalPeriodReturns(CustomFactor):
inputs = [EquityPricing.close]
def compute(self, today, assets, out, close_price):
start_price = close_price[0]
end_price = close_price[-1]
returns = end_price / start_price - 1
out[:] = returns
return SignalPeriodReturns(window_length=lookback)
def initialize(context):
'''
A function to define things to do at the start of the strategy
'''
# The context variables can be accessed by other methods
context.params = {'lookback': 12,
'percentile': 0.1,
'min_volume': 1E7
}
# Call rebalance function on the first trading day of each month
schedule_function(strategy, date_rules.month_start(),
time_rules.market_close(minutes=1))
# Set up the pipe-lines for strategies
attach_pipeline(make_strategy_pipeline(context),
name='strategy_pipeline')
def strategy(context, data):
generate_signals(context, data)
rebalance(context, data)
def make_strategy_pipeline(context):
pipe = Pipeline()
# get the strategy parameters
lookback = context.params['lookback'] * 21
v = context.params['min_volume']
# Set the volume filter
volume_filter = average_volume_filter(lookback, v)
# compute past returns
momentum = period_returns(lookback)
pipe.add(momentum, 'momentum')
pipe.set_screen(volume_filter)
return pipe
def generate_signals(context, data):
try:
pipeline_results = pipeline_output('strategy_pipeline')
except:
context.long_securities = []
context.short_securities = []
return
p = context.params['percentile']
momentum = pipeline_results
long_candidates = momentum[momentum > 0].dropna().sort_values('momentum')
short_candidates = momentum[momentum < 0].dropna().sort_values('momentum')
n_long = len(long_candidates)
n_short = len(short_candidates)
n = int(min(n_long, n_short) * p)
if n == 0:
print("{}, no signals".format(data.current_dt))
context.long_securities = []
context.short_securities = []
context.long_securities = long_candidates.index[-n:]
context.short_securities = short_candidates.index[:n]
def rebalance(context, data):
# weighing function
n = len(context.long_securities)
if n < 1:
return
weight = 0.5 / n
# square off old positions if any
for security in context.portfolio.positions:
if security not in context.long_securities and \
security not in context.short_securities:
order_target_percent(security, 0)
# Place orders for the new portfolio
for security in context.long_securities:
order_target_percent(security, weight)
for security in context.short_securities:
order_target_percent(security, -weight)
我已经在 PyCharm 上本地设置了 zipline。模拟工作,此外,我可以访问来自 quandl 的高级数据(当我输入我的 API 密钥时自动更新)。但是,现在我的问题是,如何使用 zipline 在本地创建管道。
Zipline 的文档具有挑战性。 Zipline.io(截至 2021-0405)也下降了。幸运的是,Blueshift 有文档和示例代码,展示了如何制作可以在本地 运行 的管道:
- Blueshift 示例管道 code is here. (Pipelines library here。)
- 可以从 MLTrading (archive documentation here) 访问 Zipline 文档,因为尽管具有挑战性,但它仍然很有用。
- 来自 Blueshift 的管道示例代码的完整代码,但通过 PyCharm 在本地修改为 运行,位于该行下方。请注意,我相信您已经知道,该策略是一个糟糕的策略,您不应该使用它进行交易。不过,它确实显示了管道的本地实例化。
"""
Title: Classic (Pedersen) time-series momentum (equal weights)
Description: This strategy uses past returns and go long (short)
the positive (negative) n-percentile
Style tags: Momentum
Asset class: Equities, Futures, ETFs, Currencies
Dataset: All
"""
"""
Sources:
Overall Algorithm here:
https://github.com/QuantInsti/blueshift-demo-strategies/blob/master/factors/time_series_momentum.py
Custom (Ave Vol Filter, Period Returns) Functions Here:
https://github.com/QuantInsti/blueshift-demo-strategies/blob/master/library/pipelines/pipelines.py
"""
import numpy as np
from zipline.pipeline import CustomFilter, CustomFactor, Pipeline
from zipline.pipeline.data import EquityPricing
from zipline.api import (
order_target_percent,
schedule_function,
date_rules,
time_rules,
attach_pipeline,
pipeline_output,
)
def average_volume_filter(lookback, amount):
"""
Returns a custom filter object for volume-based filtering.
Args:
lookback (int): lookback window size
amount (int): amount to filter (high-pass)
Returns:
A custom filter object
Examples::
# from library.pipelines.pipelines import average_volume_filter
pipe = Pipeline()
volume_filter = average_volume_filter(200, 1000000)
pipe.set_screen(volume_filter)
"""
class AvgDailyDollarVolumeTraded(CustomFilter):
inputs = [EquityPricing.close, EquityPricing.volume]
def compute(self, today, assets, out, close_price, volume):
dollar_volume = np.mean(close_price * volume, axis=0)
high_volume = dollar_volume > amount
out[:] = high_volume
return AvgDailyDollarVolumeTraded(window_length=lookback)
def period_returns(lookback):
"""
Returns a custom factor object for computing simple returns over
period.
Args:
lookback (int): lookback window size
Returns:
A custom factor object.
Examples::
# from library.pipelines.pipelines import period_returns
pipe = Pipeline()
momentum = period_returns(200)
pipe.add(momentum,'momentum')
"""
class SignalPeriodReturns(CustomFactor):
inputs = [EquityPricing.close]
def compute(self, today, assets, out, close_price):
start_price = close_price[0]
end_price = close_price[-1]
returns = end_price / start_price - 1
out[:] = returns
return SignalPeriodReturns(window_length=lookback)
def initialize(context):
'''
A function to define things to do at the start of the strategy
'''
# The context variables can be accessed by other methods
context.params = {'lookback': 12,
'percentile': 0.1,
'min_volume': 1E7
}
# Call rebalance function on the first trading day of each month
schedule_function(strategy, date_rules.month_start(),
time_rules.market_close(minutes=1))
# Set up the pipe-lines for strategies
attach_pipeline(make_strategy_pipeline(context),
name='strategy_pipeline')
def strategy(context, data):
generate_signals(context, data)
rebalance(context, data)
def make_strategy_pipeline(context):
pipe = Pipeline()
# get the strategy parameters
lookback = context.params['lookback'] * 21
v = context.params['min_volume']
# Set the volume filter
volume_filter = average_volume_filter(lookback, v)
# compute past returns
momentum = period_returns(lookback)
pipe.add(momentum, 'momentum')
pipe.set_screen(volume_filter)
return pipe
def generate_signals(context, data):
try:
pipeline_results = pipeline_output('strategy_pipeline')
except:
context.long_securities = []
context.short_securities = []
return
p = context.params['percentile']
momentum = pipeline_results
long_candidates = momentum[momentum > 0].dropna().sort_values('momentum')
short_candidates = momentum[momentum < 0].dropna().sort_values('momentum')
n_long = len(long_candidates)
n_short = len(short_candidates)
n = int(min(n_long, n_short) * p)
if n == 0:
print("{}, no signals".format(data.current_dt))
context.long_securities = []
context.short_securities = []
context.long_securities = long_candidates.index[-n:]
context.short_securities = short_candidates.index[:n]
def rebalance(context, data):
# weighing function
n = len(context.long_securities)
if n < 1:
return
weight = 0.5 / n
# square off old positions if any
for security in context.portfolio.positions:
if security not in context.long_securities and \
security not in context.short_securities:
order_target_percent(security, 0)
# Place orders for the new portfolio
for security in context.long_securities:
order_target_percent(security, weight)
for security in context.short_securities:
order_target_percent(security, -weight)