VPN Indicator ThinkScript 到 Python

VPN Indicator ThinkScript to Python

第一次尝试将 ThinkScript 转换为 Python,我认为我的逻辑是正确的,但我遗漏了一些东西,因为指标的两个图不匹配。

正在尝试将 VPNIndicator 的 ThinkScript 转换为 Python 实现。寻找精通两种语言的人在这里做出贡献。

首先,ThinkorSwim 中的指标图如下所示(底部):

所以我尝试使用 matplotlib finance 复制该图,但首先我需要将 ThinkScript 转换为 Python,我已在此处尝试过:

import mplfinance as mpf
import pandas as pd 
import numpy as np
import talib

def VPN_Indicator(df, params):

    # def atr = WildersAverage(TrueRange(high,  close,  low), length);
    df['H-L'] = df['High'] - df['Low']
    df['H-C1'] = df['High'] - df['Close'].shift()
    df['C1-L'] = df['Close'].shift() - df['Low']
    df['TrueRange'] = df[['H-L','H-C1','C1-L']].max(axis=1)
    df['WildersATR'] = df['TrueRange'].ewm(alpha=1.0 / params['length'], adjust=False).mean()

    # def diff = hlc3 - hlc3[1];
    df['Diff'] = ((df['High'] + df['Low'] + df['Close']) / 3) - ((df['High'].shift() + df['Low'].shift() + df['Close'].shift()) / 3) # Forward peak here?

    # def vp = Sum(if diff > factor * atr then volume else 0, length);
    df['VP_Helper'] = np.where(df['Diff'] > params['factor'] * df['WildersATR'], df['Volume'], 0)
    df['VP'] = df['VP_Helper'].rolling(params['length']).sum()

    # def vn = Sum(if diff < -factor * atr then volume else 0, length);
    df['VN_Helper'] = np.where(df['Diff'] < -params['factor'] * df['WildersATR'], df['Volume'], 0)
    df['VN'] = df['VN_Helper'].rolling(params['length']).sum()

    # plot VPN = ExpAverage(100 * (vp - vn) / Sum(volume, length), emaLength);
    df['RollingVol'] = df['Volume'].rolling(params['length']).sum()
    df['VPN'] = talib.EMA(100 * (df['VP'] - df['VN']) / df['RollingVol'], timeperiod=params['emaLength'])

    # plot VPNAvg = MovingAverage(averageType, VPN, averageLength);
    if params['averageType'] in ['simple','sma','SMA','SIMPLE']:
        df['VPNAvg'] = talib.SMA(df['VPN'], timeperiod=params['averageLength'])
    
    # plot CriticalLevel = criticalValue;
    df['CriticalLevel'] = params['criticalValue']

    # VPN.DefineColor("Above", Color.UPTICK);
    # VPN.DefineColor("Below", Color.DOWNTICK);
    # VPN.AssignValueColor(if VPN > CriticalLevel then VPN.Color("Above") else VPN.Color("Below"));
    # VPNAvg.SetDefaultColor(GetColor(7));
    # CriticalLevel.SetDefaultColor(GetColor(1));

    # Gimicks, don't need the top bit for now

    return df


params = {
    "length": 30,
    "emaLength": 3,
    "averageLength": 30,
    "factor": 0.1,
    "criticalValue": 10,
    "averageType": "simple"

}

# Import a 1min dataset and rename columns as necessary
df = pd.read_csv("SPY.csv").iloc[-2000:,:]
df['time'] = pd.to_datetime(df['time'])
df = df.set_index('time')
df = df.rename(columns={'open':'Open', 'high':'High', 'low':"Low", "close": "Close", "volume": "Volume"})
df = VPN_Indicator(df, params)

# Plot the results
apds = [ mpf.make_addplot((df['CriticalLevel']), panel=2, color='g'),
         mpf.make_addplot((df['VPN']), panel=2, color='g'),
         mpf.make_addplot((df['VPNAvg']), panel=2, color='g'),
       ]

mpf.plot(df[['Open', 'High', 'Low', 'Close', 'Volume']], addplot=apds, figscale=1.2, volume=True)

... 结果如下所示:

... 这很接近,但峰值与 ThinkOrSwim 图不一致。所以我想从了解这些语言的人那里知道我可能会离开哪里?谢谢!

尝试使用它来计算 ATR。这给出了与 TOS 相同的输出。

import numpy as np

def ema(arr, periods=14, weight=1, init=None):
    leading_na = np.where(~np.isnan(arr))[0][0]
    arr = arr[leading_na:]
    alpha = weight / (periods + (weight-1))
    alpha_rev = 1 - alpha
    n = arr.shape[0]
    pows = alpha_rev**(np.arange(n+1))
    out1 = np.array([])
    if 0 in pows:
        out1 = ema(arr[:int(len(arr)/2)], periods)
        arr = arr[int(len(arr)/2) - 1:]
        init = out1[-1]
        n = arr.shape[0]
        pows = alpha_rev**(np.arange(n+1))
    scale_arr = 1/pows[:-1]
    if init:
        offset = init * pows[1:]
    else:
        offset = arr[0]*pows[1:]
    pw0 = alpha*alpha_rev**(n-1)
    mult = arr*pw0*scale_arr
    cumsums = mult.cumsum()
    out = offset + cumsums*scale_arr[::-1]
    out = out[1:] if len(out1) > 0 else out
    out = np.concatenate([out1, out])
    out[:periods] = np.nan
    out = np.concatenate(([np.nan]*leading_na, out))
    return out


def atr(highs, lows, closes, periods=14, ema_weight=1):
    hi = np.array(highs)
    lo = np.array(lows)
    c = np.array(closes)
    tr = np.vstack([np.abs(hi[1:]-c[:-1]),
                    np.abs(lo[1:]-c[:-1]),
                    (hi-lo)[1:]]).max(axis=0)
    atr = ema(tr, periods=periods, weight=ema_weight)
    atr = np.concatenate([[np.nan], atr])
    return atr