无法 return 多处理池的特定输出
Unable to return a specific output from multiprocessing pool
我正在尝试使用 python 多处理库来调用一个函数 (calc_indicator),该函数采用 ta-lib 中技术指标的字符串名称数组,然后调用另一个function(technical_indicators) 使用传递给第一个函数 (cal_indicator) 的字符串名称列表计算值。这就是我想要的输出:
当我运行下面的代码时:
import multiprocessing as mp
import pandas as pd
import numpy as np
from talib import abstract
dataset = pd.read_csv('Data/Currencies/COST.csv')
working_frame = dataset.drop(['Date', 'Adj Close'],axis=1)
def technical_indicators(currency_dataframe, indicator):
nothing_found = 'Indicator Not Found'
inputs = {
'open':currency_dataframe['Open'],
'high':currency_dataframe['High'],
'low':currency_dataframe['Low'],
'close':currency_dataframe['Close'],
'volume':currency_dataframe['Volume']
}
DEMA = abstract.DEMA(inputs, timeperiod=20)
EMA = abstract.EMA(inputs, timeperiod=20)
KAMA = abstract.KAMA(inputs, timeperiod=20)
MA = abstract.MA(inputs, timeperiod=20, matype=0)
ATR = abstract.ATR(inputs, timeperiod=20)
NATR = abstract.NATR(inputs, timeperiod=20)
TRANGE = abstract.TRANGE(inputs)
if(indicator == 'DEMA'):
return DEMA
elif(indicator == 'EMA'):
return EMA
elif(indicator == 'KAMA'):
return KAMA
elif(indicator == 'MA'):
return MA
elif(indicator == 'ATR'):
return ATR
elif(indicator == 'NATR'):
return NATR
elif(indicator == 'TRANGE'):
return TRANGE
else:
return nothing_found
list0 = ['DEMA', 'EMA', 'KAMA', 'MA']
list1 = ['ATR', 'NATR', 'TRANGE']
calc_frame = pd.DataFrame()
def calc_indicator(data_list):
for i in range(len(data_list)):
tindicator = technical_indicators(working_frame, data_list[i])
calc_frame[data_list[i]] = tindicator
return calc_frame
cal_ = calc_indicator(list0)
pool = mp.Pool(mp.cpu_count())
res0 = pool.map(calc_indicator, list0)
res1 = pool.map(calc_indicator, list1)
我得到这个输出:
D
E
K
M
M
A
E
A
M
A
M
A
A
A
T
N
T
R
A
A
N
R
G
T
E
R
Link 对于我使用的数据:daily prices
第一个问题是您的 calc_indicator 函数需要一个字符串列表。
但是 pool.map() api 消耗了列表,而 calc_indicator() 是用单独的字符串调用的(例如 calc_indicator('DEMA')
),所以 calc_indicator 正在索引到字符串的字符,而不是索引到列表中。
第二个问题是您试图从多个子进程更新单个对象 calc_frame。但是每个子进程都有自己的内存space,所以主进程中的calc_frame不会被子进程影响。
相反,通过 pool.map() 使子进程 return 成为 technical_indicator() 的结果,并迭代 pool.map() 以更新 calc_frame每个结果依次为:
def one_calc_indicator(indicator):
return indicator, technical_indicators(working_frame, indicator)
pool = mp.Pool(mp.cpu_count())
for indicator, result in pool.map(one_calc_indicator, list0):
calc_frame[indicator] = result
我正在尝试使用 python 多处理库来调用一个函数 (calc_indicator),该函数采用 ta-lib 中技术指标的字符串名称数组,然后调用另一个function(technical_indicators) 使用传递给第一个函数 (cal_indicator) 的字符串名称列表计算值。这就是我想要的输出:
当我运行下面的代码时:
import multiprocessing as mp
import pandas as pd
import numpy as np
from talib import abstract
dataset = pd.read_csv('Data/Currencies/COST.csv')
working_frame = dataset.drop(['Date', 'Adj Close'],axis=1)
def technical_indicators(currency_dataframe, indicator):
nothing_found = 'Indicator Not Found'
inputs = {
'open':currency_dataframe['Open'],
'high':currency_dataframe['High'],
'low':currency_dataframe['Low'],
'close':currency_dataframe['Close'],
'volume':currency_dataframe['Volume']
}
DEMA = abstract.DEMA(inputs, timeperiod=20)
EMA = abstract.EMA(inputs, timeperiod=20)
KAMA = abstract.KAMA(inputs, timeperiod=20)
MA = abstract.MA(inputs, timeperiod=20, matype=0)
ATR = abstract.ATR(inputs, timeperiod=20)
NATR = abstract.NATR(inputs, timeperiod=20)
TRANGE = abstract.TRANGE(inputs)
if(indicator == 'DEMA'):
return DEMA
elif(indicator == 'EMA'):
return EMA
elif(indicator == 'KAMA'):
return KAMA
elif(indicator == 'MA'):
return MA
elif(indicator == 'ATR'):
return ATR
elif(indicator == 'NATR'):
return NATR
elif(indicator == 'TRANGE'):
return TRANGE
else:
return nothing_found
list0 = ['DEMA', 'EMA', 'KAMA', 'MA']
list1 = ['ATR', 'NATR', 'TRANGE']
calc_frame = pd.DataFrame()
def calc_indicator(data_list):
for i in range(len(data_list)):
tindicator = technical_indicators(working_frame, data_list[i])
calc_frame[data_list[i]] = tindicator
return calc_frame
cal_ = calc_indicator(list0)
pool = mp.Pool(mp.cpu_count())
res0 = pool.map(calc_indicator, list0)
res1 = pool.map(calc_indicator, list1)
我得到这个输出:
D
E
K
M
M
A
E
A
M
A
M
A
A
A
T
N
T
R
A
A
N
R
G
T
E
R
Link 对于我使用的数据:daily prices
第一个问题是您的 calc_indicator 函数需要一个字符串列表。
但是 pool.map() api 消耗了列表,而 calc_indicator() 是用单独的字符串调用的(例如 calc_indicator('DEMA')
),所以 calc_indicator 正在索引到字符串的字符,而不是索引到列表中。
第二个问题是您试图从多个子进程更新单个对象 calc_frame。但是每个子进程都有自己的内存space,所以主进程中的calc_frame不会被子进程影响。
相反,通过 pool.map() 使子进程 return 成为 technical_indicator() 的结果,并迭代 pool.map() 以更新 calc_frame每个结果依次为:
def one_calc_indicator(indicator):
return indicator, technical_indicators(working_frame, indicator)
pool = mp.Pool(mp.cpu_count())
for indicator, result in pool.map(one_calc_indicator, list0):
calc_frame[indicator] = result