为什么dask不并行执行
Why dask doesnt execute in parallel
谁能指出我在以下 dask 实现中做错了什么,因为它似乎没有使用多核。
[更新了可重现的代码]
使用dask的代码:
bookingID = np.arange(1,10000)
book_data = pd.DataFrame(np.random.rand(1000))
def calculate_feature_stats(bookingID):
curr_book_data = book_data
row = list()
row.append(bookingID)
row.append(curr_book_data.min())
row.append(curr_book_data.max())
row.append(curr_book_data.std())
row.append(curr_book_data.mean())
return row
calculate_feature_stats = dask.delayed(calculate_feature_stats)
rows = []
for bookid in bookingID.tolist():
row = calculate_feature_stats(bookid)
rows.append(row)
start = time.time()
rows = dask.persist(*rows)
end = time.time()
print(end - start) # Execution time = 16s in my machine
正常实现的代码没有 dask :
bookingID = np.arange(1,10000)
book_data = pd.DataFrame(np.random.rand(1000))
def calculate_feature_stats_normal(bookingID):
curr_book_data = book_data
row = list()
row.append(bookingID)
row.append(curr_book_data.min())
row.append(curr_book_data.max())
row.append(curr_book_data.std())
row.append(curr_book_data.mean())
return row
rows = []
start = time.time()
for bookid in bookingID.tolist():
row = calculate_feature_stats_normal(bookid)
rows.append(row)
end = time.time()
print(end - start) # Execution time = 4s in my machine
所以,没有 dask 实际上更快,这怎么可能?
回答
扩展评论。您应该考虑使用 dask 大约有 1 毫秒的开销(请参阅 doc),因此如果您的计算时间短于此,那么 dask 不值得麻烦。
针对您的具体问题,我可以想到两种可能的现实世界场景:
1. 一个大数据框,其中有一列名为 bookingID
,另一列为 value
2. 每个 bookingID
一个不同的文件
在第二种情况下,您可以从 播放,而对于第一种情况,您可以按照以下步骤进行:
import dask.dataframe as dd
import numpy as np
import pandas as pd
# create dummy df
df = []
for i in range(10_000):
df.append(pd.DataFrame({"id":i,
"value":np.random.rand(1000)}))
df = pd.concat(df, ignore_index=True)
df = df.sample(frac=1).reset_index(drop=True)
df.to_parquet("df.parq")
Pandas
%%time
df = pd.read_parquet("df.parq")
out = df.groupby("id").agg({"value":{"min", "max", "std", "mean"}})
out.columns = [col[1] for col in out.columns]
out = out.reset_index(drop=True)
CPU times: user 1.65 s, sys: 316 ms, total: 1.96 s
Wall time: 1.08 s
达斯克
%%time
df = dd.read_parquet("df.parq")
out = df.groupby("id").agg({"value":["min", "max", "std", "mean"]}).compute()
out.columns = [col[1] for col in out.columns]
out = out.reset_index(drop=True)
CPU times: user 4.94 s, sys: 427 ms, total: 5.36 s
Wall time: 3.94 s
最后的想法
在这种情况下,如果 df
不适合内存,dask 就开始有意义了。
谁能指出我在以下 dask 实现中做错了什么,因为它似乎没有使用多核。
[更新了可重现的代码]
使用dask的代码:
bookingID = np.arange(1,10000)
book_data = pd.DataFrame(np.random.rand(1000))
def calculate_feature_stats(bookingID):
curr_book_data = book_data
row = list()
row.append(bookingID)
row.append(curr_book_data.min())
row.append(curr_book_data.max())
row.append(curr_book_data.std())
row.append(curr_book_data.mean())
return row
calculate_feature_stats = dask.delayed(calculate_feature_stats)
rows = []
for bookid in bookingID.tolist():
row = calculate_feature_stats(bookid)
rows.append(row)
start = time.time()
rows = dask.persist(*rows)
end = time.time()
print(end - start) # Execution time = 16s in my machine
正常实现的代码没有 dask :
bookingID = np.arange(1,10000)
book_data = pd.DataFrame(np.random.rand(1000))
def calculate_feature_stats_normal(bookingID):
curr_book_data = book_data
row = list()
row.append(bookingID)
row.append(curr_book_data.min())
row.append(curr_book_data.max())
row.append(curr_book_data.std())
row.append(curr_book_data.mean())
return row
rows = []
start = time.time()
for bookid in bookingID.tolist():
row = calculate_feature_stats_normal(bookid)
rows.append(row)
end = time.time()
print(end - start) # Execution time = 4s in my machine
所以,没有 dask 实际上更快,这怎么可能?
回答
扩展评论。您应该考虑使用 dask 大约有 1 毫秒的开销(请参阅 doc),因此如果您的计算时间短于此,那么 dask 不值得麻烦。
针对您的具体问题,我可以想到两种可能的现实世界场景:
1. 一个大数据框,其中有一列名为 bookingID
,另一列为 value
2. 每个 bookingID
在第二种情况下,您可以从
import dask.dataframe as dd
import numpy as np
import pandas as pd
# create dummy df
df = []
for i in range(10_000):
df.append(pd.DataFrame({"id":i,
"value":np.random.rand(1000)}))
df = pd.concat(df, ignore_index=True)
df = df.sample(frac=1).reset_index(drop=True)
df.to_parquet("df.parq")
Pandas
%%time
df = pd.read_parquet("df.parq")
out = df.groupby("id").agg({"value":{"min", "max", "std", "mean"}})
out.columns = [col[1] for col in out.columns]
out = out.reset_index(drop=True)
CPU times: user 1.65 s, sys: 316 ms, total: 1.96 s
Wall time: 1.08 s
达斯克
%%time
df = dd.read_parquet("df.parq")
out = df.groupby("id").agg({"value":["min", "max", "std", "mean"]}).compute()
out.columns = [col[1] for col in out.columns]
out = out.reset_index(drop=True)
CPU times: user 4.94 s, sys: 427 ms, total: 5.36 s
Wall time: 3.94 s
最后的想法
在这种情况下,如果 df
不适合内存,dask 就开始有意义了。