Dask:我如何将我的代码与 dask 延迟并行化?
Dask: How would I parallelize my code with dask delayed?
这是我第一次尝试并行处理,我一直在研究 Dask,但我在实际编码时遇到了麻烦。
我看过他们的示例和文档,我认为 dask.delayed 效果最好。我试图用 delayed(function_name) 包装我的函数,或者添加一个 @delayed 装饰器,但我似乎无法让它正常工作。与其他方法相比,我更喜欢 Dask,因为它是在 python 中制作的,而且它(假定的)简单。我知道 dask 在 for 循环中不起作用,但他们说它可以在循环内工作。
我的代码通过包含对其他函数的输入的函数传递文件,如下所示:
from dask import delayed
filenames = ['1.csv', '2.csv', '3.csv', etc. etc. ]
for count, name in enumerate(filenames)"
name = name.split('.')[0]
....
然后做一些预处理例如:
preprocess1, preprocess2 = delayed(read_files_and_do_some_stuff)(name)
然后我调用构造函数并将 pre_results 传递给函数调用:
fc = FunctionCalls()
Daily = delayed(fc.function_runs)(filename=name, stringinput='Daily',
input_data=pre_result1, model1=pre_result2)
我在这里所做的是将文件传递到 for 循环中,进行一些预处理,然后将文件传递到两个模型中。
关于如何并行化这个的想法或技巧?我开始遇到奇怪的错误,而且我不知道如何修复代码。该代码确实按原样工作。我使用了一堆 pandas 数据帧、系列和 numpy 数组,我不想返回并更改所有内容以使用 dask.dataframes 等
我评论中的代码可能难以阅读。这是一种更格式化的方式。
在下面的代码中,当我输入 print(mean_squared_error) 时,我只得到:Delayed('mean_squared_error-3009ec00-7ff5-4865-8338-1fec3f9ed138')
from dask import delayed
import pandas as pd
from sklearn.metrics import mean_squared_error as mse
filenames = ['file1.csv']
for count, name in enumerate(filenames):
file1 = pd.read_csv(name)
df = pd.DataFrame(file1)
prediction = df['Close'][:-1]
observed = df['Close'][1:]
mean_squared_error = delayed(mse)(observed, prediction)
您需要调用 dask.compute 来最终计算结果。参见 dask.delayed documentation。
顺序码
import pandas as pd
from sklearn.metrics import mean_squared_error as mse
filenames = [...]
results = []
for count, name in enumerate(filenames):
file1 = pd.read_csv(name)
df = pd.DataFrame(file1) # isn't this already a dataframe?
prediction = df['Close'][:-1]
observed = df['Close'][1:]
mean_squared_error = mse(observed, prediction)
results.append(mean_squared_error)
并行代码
import dask
import pandas as pd
from sklearn.metrics import mean_squared_error as mse
filenames = [...]
delayed_results = []
for count, name in enumerate(filenames):
df = dask.delayed(pd.read_csv)(name)
prediction = df['Close'][:-1]
observed = df['Close'][1:]
mean_squared_error = dask.delayed(mse)(observed, prediction)
delayed_results.append(mean_squared_error)
results = dask.compute(*delayed_results)
IMO,比公认的答案更清晰的解决方案是这个片段。
from dask import compute, delayed
import pandas as pd
from sklearn.metrics import mean_squared_error as mse
filenames = [...]
def compute_mse(file_name):
df = pd.read_csv(file_name)
prediction = df['Close'][:-1]
observed = df['Close'][1:]
return mse(observed, prediction)
delayed_results = [delayed(compute_mse)(file_name) for file_name in filenames]
mean_squared_errors = compute(*delayed_results, scheduler="processes")
这是我第一次尝试并行处理,我一直在研究 Dask,但我在实际编码时遇到了麻烦。
我看过他们的示例和文档,我认为 dask.delayed 效果最好。我试图用 delayed(function_name) 包装我的函数,或者添加一个 @delayed 装饰器,但我似乎无法让它正常工作。与其他方法相比,我更喜欢 Dask,因为它是在 python 中制作的,而且它(假定的)简单。我知道 dask 在 for 循环中不起作用,但他们说它可以在循环内工作。
我的代码通过包含对其他函数的输入的函数传递文件,如下所示:
from dask import delayed
filenames = ['1.csv', '2.csv', '3.csv', etc. etc. ]
for count, name in enumerate(filenames)"
name = name.split('.')[0]
....
然后做一些预处理例如:
preprocess1, preprocess2 = delayed(read_files_and_do_some_stuff)(name)
然后我调用构造函数并将 pre_results 传递给函数调用:
fc = FunctionCalls()
Daily = delayed(fc.function_runs)(filename=name, stringinput='Daily',
input_data=pre_result1, model1=pre_result2)
我在这里所做的是将文件传递到 for 循环中,进行一些预处理,然后将文件传递到两个模型中。
关于如何并行化这个的想法或技巧?我开始遇到奇怪的错误,而且我不知道如何修复代码。该代码确实按原样工作。我使用了一堆 pandas 数据帧、系列和 numpy 数组,我不想返回并更改所有内容以使用 dask.dataframes 等
我评论中的代码可能难以阅读。这是一种更格式化的方式。
在下面的代码中,当我输入 print(mean_squared_error) 时,我只得到:Delayed('mean_squared_error-3009ec00-7ff5-4865-8338-1fec3f9ed138')
from dask import delayed
import pandas as pd
from sklearn.metrics import mean_squared_error as mse
filenames = ['file1.csv']
for count, name in enumerate(filenames):
file1 = pd.read_csv(name)
df = pd.DataFrame(file1)
prediction = df['Close'][:-1]
observed = df['Close'][1:]
mean_squared_error = delayed(mse)(observed, prediction)
您需要调用 dask.compute 来最终计算结果。参见 dask.delayed documentation。
顺序码
import pandas as pd
from sklearn.metrics import mean_squared_error as mse
filenames = [...]
results = []
for count, name in enumerate(filenames):
file1 = pd.read_csv(name)
df = pd.DataFrame(file1) # isn't this already a dataframe?
prediction = df['Close'][:-1]
observed = df['Close'][1:]
mean_squared_error = mse(observed, prediction)
results.append(mean_squared_error)
并行代码
import dask
import pandas as pd
from sklearn.metrics import mean_squared_error as mse
filenames = [...]
delayed_results = []
for count, name in enumerate(filenames):
df = dask.delayed(pd.read_csv)(name)
prediction = df['Close'][:-1]
observed = df['Close'][1:]
mean_squared_error = dask.delayed(mse)(observed, prediction)
delayed_results.append(mean_squared_error)
results = dask.compute(*delayed_results)
IMO,比公认的答案更清晰的解决方案是这个片段。
from dask import compute, delayed
import pandas as pd
from sklearn.metrics import mean_squared_error as mse
filenames = [...]
def compute_mse(file_name):
df = pd.read_csv(file_name)
prediction = df['Close'][:-1]
observed = df['Close'][1:]
return mse(observed, prediction)
delayed_results = [delayed(compute_mse)(file_name) for file_name in filenames]
mean_squared_errors = compute(*delayed_results, scheduler="processes")