使用Dask加速参数测试
Speed up parameter testing using Dask
我有一个包含大约 10 列的时间序列数据框,我在其中对时间序列执行操作以 return 策略数据的结果。我想测试 2 个参数,因为它们可能会或可能不会相互影响。独立测试时,每个 运行 每个单元花费超过 10 秒(总计 运行 超过 6.5 小时),我希望加快速度……我一直在阅读有关 dask 的内容,似乎它是正确的模块。
我当前的代码使用嵌套循环遍历每个参数范围。我知道它可以并行,因为每天的数据是互斥的。
这是代码:
amount1=np.arange(.001,.03,.0005)
amount2=np.arange(.001,.03,.0005)
def getResults(df,amount1,amount2):
final_results=[]
for x in tqdm(amount1):
for y in amount2:
df1=None
df1=function1(df.copy(), x, y ) #takes about 2sec.
df1=function2(df1) #takes about 2sec.
df1=function3(df1) #takes about 3sec.
final_results.append([x,y,df1['results'].iloc[-1]])
return final_results
更新:
所以看起来改进应该通过调整函数来从调用中删除迭代并创建作业列表(我的理解。这是我到目前为止的位置。我可能需要移动我的df 到 dask 数据帧,以便数据可以分成更小的块。问题是我是否将函数 1、2 和 3 函数保留为 pandas 向量操作,或者它们是否需要移动以完成 dask 函数?
def getResults(df,amount):
df1=None
df1=dsk.delayed(function1)(df,amount[0],amount[1] )
df1=dsk.delayed(function2)(df1)
df1=dsk.delayed(function2)(df1)
return [amount[0],amount[1],df1['results'].iloc[-1]]
#Create a list of processes from jobs. jobs is a list of tuples that replaces the iteration.
processes =[getResults(df,items) for items in jobs]
#Create a process list of results
results=[]
for i in range(len(processes):
results.append(processes[i])
您可能想使用任一 dask.delayed or the concurrent.futures 界面。
像下面这样的东西可能会很好用(未经测试,我建议您阅读上面引用的文档以了解它在做什么)。
def getResults(df,amount1,amount2):
final_results=[]
for x in amount1:
for y in amount2:
df1=None
df1=dask.delayed(function1)(df.copy(), x, y )
df1=dask.delayed(function2)(df1)
df1=dask.delayed(function3)(df1)
final_results.append([x,y,df1['results'].iloc[-1]])
return final_results
out = getResults(df, amount1, amount2)
result = delayed(out).compute()
此外,如果可以避免,我会避免调用 df.copy()
。理想情况下,function1 不会改变输入数据。
我有一个包含大约 10 列的时间序列数据框,我在其中对时间序列执行操作以 return 策略数据的结果。我想测试 2 个参数,因为它们可能会或可能不会相互影响。独立测试时,每个 运行 每个单元花费超过 10 秒(总计 运行 超过 6.5 小时),我希望加快速度……我一直在阅读有关 dask 的内容,似乎它是正确的模块。
我当前的代码使用嵌套循环遍历每个参数范围。我知道它可以并行,因为每天的数据是互斥的。
这是代码:
amount1=np.arange(.001,.03,.0005)
amount2=np.arange(.001,.03,.0005)
def getResults(df,amount1,amount2):
final_results=[]
for x in tqdm(amount1):
for y in amount2:
df1=None
df1=function1(df.copy(), x, y ) #takes about 2sec.
df1=function2(df1) #takes about 2sec.
df1=function3(df1) #takes about 3sec.
final_results.append([x,y,df1['results'].iloc[-1]])
return final_results
更新:
所以看起来改进应该通过调整函数来从调用中删除迭代并创建作业列表(我的理解。这是我到目前为止的位置。我可能需要移动我的df 到 dask 数据帧,以便数据可以分成更小的块。问题是我是否将函数 1、2 和 3 函数保留为 pandas 向量操作,或者它们是否需要移动以完成 dask 函数?
def getResults(df,amount):
df1=None
df1=dsk.delayed(function1)(df,amount[0],amount[1] )
df1=dsk.delayed(function2)(df1)
df1=dsk.delayed(function2)(df1)
return [amount[0],amount[1],df1['results'].iloc[-1]]
#Create a list of processes from jobs. jobs is a list of tuples that replaces the iteration.
processes =[getResults(df,items) for items in jobs]
#Create a process list of results
results=[]
for i in range(len(processes):
results.append(processes[i])
您可能想使用任一 dask.delayed or the concurrent.futures 界面。
像下面这样的东西可能会很好用(未经测试,我建议您阅读上面引用的文档以了解它在做什么)。
def getResults(df,amount1,amount2):
final_results=[]
for x in amount1:
for y in amount2:
df1=None
df1=dask.delayed(function1)(df.copy(), x, y )
df1=dask.delayed(function2)(df1)
df1=dask.delayed(function3)(df1)
final_results.append([x,y,df1['results'].iloc[-1]])
return final_results
out = getResults(df, amount1, amount2)
result = delayed(out).compute()
此外,如果可以避免,我会避免调用 df.copy()
。理想情况下,function1 不会改变输入数据。