有没有办法使用 zipline 在本地创建管道?

Is there a way to create a pipeline locally using zipline?

我已经在 PyCharm 上本地设置了 zipline。模拟工作,此外,我可以访问来自 quandl 的高级数据(当我输入我的 API 密钥时自动更新)。但是,现在我的问题是,如何使用 zipline 在本地创建管道。

Zipline 的文档具有挑战性。 Zipline.io(截至 2021-0405)也下降了。幸运的是,Blueshift 有文档和示例代码,展示了如何制作可以在本地 运行 的管道:

  • Blueshift 示例管道 code is here. (Pipelines library here。)
  • 可以从 MLTrading (archive documentation here) 访问 Zipline 文档,因为尽管具有挑战性,但它仍然很有用。
  • 来自 Blueshift 的管道示例代码的完整代码,但通过 PyCharm 在本地修改为 运行,位于该行下方。请注意,我相信您已经知道,该策略是一个糟糕的策略,您不应该使用它进行交易。不过,它确实显示了管道的本地实例化。

"""
    Title: Classic (Pedersen) time-series momentum (equal weights)
    Description: This strategy uses past returns and go long (short)
                the positive (negative) n-percentile
    Style tags: Momentum
    Asset class: Equities, Futures, ETFs, Currencies
    Dataset: All
"""

"""
Sources:
Overall Algorithm here:
https://github.com/QuantInsti/blueshift-demo-strategies/blob/master/factors/time_series_momentum.py

Custom (Ave Vol Filter, Period Returns) Functions Here: 
https://github.com/QuantInsti/blueshift-demo-strategies/blob/master/library/pipelines/pipelines.py
"""

import numpy as np

from zipline.pipeline import CustomFilter, CustomFactor, Pipeline
from zipline.pipeline.data import EquityPricing
from zipline.api import (
    order_target_percent,
    schedule_function,
    date_rules,
    time_rules,
    attach_pipeline,
    pipeline_output,
)


def average_volume_filter(lookback, amount):
    """
       Returns a custom filter object for volume-based filtering.

       Args:
           lookback (int): lookback window size
           amount (int): amount to filter (high-pass)

       Returns:
           A custom filter object

       Examples::

           # from library.pipelines.pipelines import average_volume_filter

           pipe = Pipeline()
           volume_filter = average_volume_filter(200, 1000000)
           pipe.set_screen(volume_filter)
    """

    class AvgDailyDollarVolumeTraded(CustomFilter):
        inputs = [EquityPricing.close, EquityPricing.volume]

        def compute(self, today, assets, out, close_price, volume):
            dollar_volume = np.mean(close_price * volume, axis=0)
            high_volume = dollar_volume > amount
            out[:] = high_volume

    return AvgDailyDollarVolumeTraded(window_length=lookback)


def period_returns(lookback):
    """
       Returns a custom factor object for computing simple returns over
       period.

       Args:
           lookback (int): lookback window size

       Returns:
           A custom factor object.

       Examples::

           # from library.pipelines.pipelines import period_returns
           pipe = Pipeline()
           momentum = period_returns(200)
           pipe.add(momentum,'momentum')
    """

    class SignalPeriodReturns(CustomFactor):
        inputs = [EquityPricing.close]

        def compute(self, today, assets, out, close_price):
            start_price = close_price[0]
            end_price = close_price[-1]
            returns = end_price / start_price - 1
            out[:] = returns

    return SignalPeriodReturns(window_length=lookback)


def initialize(context):
    '''
        A function to define things to do at the start of the strategy
    '''
    # The context variables can be accessed by other methods
    context.params = {'lookback': 12,
                      'percentile': 0.1,
                      'min_volume': 1E7
                      }

    # Call rebalance function on the first trading day of each month
    schedule_function(strategy, date_rules.month_start(),
                      time_rules.market_close(minutes=1))

    # Set up the pipe-lines for strategies
    attach_pipeline(make_strategy_pipeline(context),
                    name='strategy_pipeline')


def strategy(context, data):
    generate_signals(context, data)
    rebalance(context, data)


def make_strategy_pipeline(context):
    pipe = Pipeline()

    # get the strategy parameters
    lookback = context.params['lookback'] * 21
    v = context.params['min_volume']

    # Set the volume filter
    volume_filter = average_volume_filter(lookback, v)

    # compute past returns
    momentum = period_returns(lookback)
    pipe.add(momentum, 'momentum')
    pipe.set_screen(volume_filter)

    return pipe


def generate_signals(context, data):
    try:
        pipeline_results = pipeline_output('strategy_pipeline')
    except:
        context.long_securities = []
        context.short_securities = []
        return

    p = context.params['percentile']
    momentum = pipeline_results

    long_candidates = momentum[momentum > 0].dropna().sort_values('momentum')
    short_candidates = momentum[momentum < 0].dropna().sort_values('momentum')

    n_long = len(long_candidates)
    n_short = len(short_candidates)
    n = int(min(n_long, n_short) * p)

    if n == 0:
        print("{}, no signals".format(data.current_dt))
        context.long_securities = []
        context.short_securities = []

    context.long_securities = long_candidates.index[-n:]
    context.short_securities = short_candidates.index[:n]


def rebalance(context, data):
    # weighing function
    n = len(context.long_securities)
    if n < 1:
        return

    weight = 0.5 / n

    # square off old positions if any
    for security in context.portfolio.positions:
        if security not in context.long_securities and \
                security not in context.short_securities:
            order_target_percent(security, 0)

    # Place orders for the new portfolio
    for security in context.long_securities:
        order_target_percent(security, weight)
    for security in context.short_securities:
        order_target_percent(security, -weight)