如何使用 Numba + Dask 正确并行化泛型代码
How to properly parallelize generic code with Numba + Dask
我不熟悉使用 Dask
和 Numba
来加速代码,我希望这可能是一个有价值的问题,让用户获得有关如何并行化代码的最佳实践的答案。我制作了一个包含 3 列的 pandas
数据框的通用测试用例。
一个通用函数将在框架中的 3 个向量上实现,以表示在数据分析中可能进行的变换类型:前两列求平方、相加,然后求平方根,然后计算一个布尔值,将结果与第 3 列进行比较。
我实现了 4 个测试用例:(a) pandas
应用,(b) Dask
,(c) Numba
,以及 (d) Dask
和Numba
在一起。
Numba
效果很好。我所有的问题都与 Dask
有关。以下是我遇到的问题:
Dask
,无论我制作多大的矢量,速度都比较慢。我可能不完全了解如何以及何时计算数据帧的某些部分,或者如何使其正确并行化。它比常规申请慢。
- 如何正确使用Dask进行并行化?我把它画成 4 个分区,我有 2 个核心处理器,但你实际上如何决定如何格式化它?
# Practice parallelizing
from dask import dataframe as dd
from numba import jit
import pandas as pd
import numpy as np
import time
# df is going to be the regular dataframe
df = pd.DataFrame(np.random.random(size=(1000000,3))*100,columns=['col1','col2','col3'])
# ddf is the dask dataframe
ddf = dd.from_pandas(df,npartitions=4)
# Check the distance regular (probably wouldn't write like this but doing for symmetry)
def check_dist(col1,col2,col3):
dist = np.sqrt(col1**2+col2**2)
check = dist < col3
return check
# Jit
@jit(nopython=True)
def check_dist_fast(col1,col2,col3):
dist = np.sqrt(col1**2+col2**2)
check = dist < col3
return check
#####################################
# Regular Python Apply
#####################################
t0 = time.time()
df['col4'] = df.apply(lambda x: check_dist(x.col1,x.col2,x.col3),axis=1)
t1 = time.time()-t0
print("Regular pandas took",t1)
df = df.drop('col4',axis=1)
#####################################
# Dask Apply
#####################################
t0 = time.time()
ddf['col4'] = ddf.map_partitions(lambda d: d.apply(
lambda x: check_dist(x.col1,x.col2,x.col3),axis=1)
).compute()
t1 = time.time()-t0
print("Dask pandas took",t1)
ddf = ddf.drop('col4',axis=1)
#####################################
# Numba Pandas
#####################################
t0 = time.time()
df['col4'] = check_dist_fast(df.col1.to_numpy(),df.col2.to_numpy(),df.col3.to_numpy())
t1 = time.time()-t0
print("Numba pandas took",t1)
df = df.drop('col4',axis=1)
#####################################
# Numba + Jit Pandas
#####################################
t0 = time.time()
t0 = time.time()
ddf['col4'] = ddf.map_partitions(lambda d: d.apply(lambda x:
check_dist_fast(x.col1,x.col2,x.col3),axis=1)).compute()
t1 = time.time()-t0
print("Numba Dask pandas took",t1)
ddf = ddf.drop('col4',axis=1)
最后,您应该了解哪些其他最佳实践。这个想法是将其发送到某种具有许多节点的集群。
时间是:
- 常规 pandas 花了 150.6191689968109
- Dask pandas 花费了 153.70575094223022
- Numba pandas 花费了 0.710655927658081
- Numba Dask pandas 花费了 139.57402181625366
我认为 dask 太慢了,因为你正在使用以下方法计算一个系列:
ddf.map_partitions(
lambda d: d.apply(lambda x: check_dist(x.col1,x.col2,x.col3), axis=1)
).compute()
然后将其分配给一个新列,这样 dask 就无法并行化该过程。以下代码执行相同的操作,但运行时间为 0.06 秒:
#####################################
# Dask Assign
#####################################
t0 = time.time()
ddf = ddf.assign(col4=lambda x: check_dist(x.col1,x.col2,x.col3))
ddf.compute()
t1 = time.time()-t0
print("Dask using Assign took",t1)
ddf = ddf.drop('col4',axis=1)
我建议您查看 dask 文档中的最佳实践部分。
希望对您有所帮助!
我不熟悉使用 Dask
和 Numba
来加速代码,我希望这可能是一个有价值的问题,让用户获得有关如何并行化代码的最佳实践的答案。我制作了一个包含 3 列的 pandas
数据框的通用测试用例。
一个通用函数将在框架中的 3 个向量上实现,以表示在数据分析中可能进行的变换类型:前两列求平方、相加,然后求平方根,然后计算一个布尔值,将结果与第 3 列进行比较。
我实现了 4 个测试用例:(a) pandas
应用,(b) Dask
,(c) Numba
,以及 (d) Dask
和Numba
在一起。
Numba
效果很好。我所有的问题都与 Dask
有关。以下是我遇到的问题:
Dask
,无论我制作多大的矢量,速度都比较慢。我可能不完全了解如何以及何时计算数据帧的某些部分,或者如何使其正确并行化。它比常规申请慢。- 如何正确使用Dask进行并行化?我把它画成 4 个分区,我有 2 个核心处理器,但你实际上如何决定如何格式化它?
# Practice parallelizing
from dask import dataframe as dd
from numba import jit
import pandas as pd
import numpy as np
import time
# df is going to be the regular dataframe
df = pd.DataFrame(np.random.random(size=(1000000,3))*100,columns=['col1','col2','col3'])
# ddf is the dask dataframe
ddf = dd.from_pandas(df,npartitions=4)
# Check the distance regular (probably wouldn't write like this but doing for symmetry)
def check_dist(col1,col2,col3):
dist = np.sqrt(col1**2+col2**2)
check = dist < col3
return check
# Jit
@jit(nopython=True)
def check_dist_fast(col1,col2,col3):
dist = np.sqrt(col1**2+col2**2)
check = dist < col3
return check
#####################################
# Regular Python Apply
#####################################
t0 = time.time()
df['col4'] = df.apply(lambda x: check_dist(x.col1,x.col2,x.col3),axis=1)
t1 = time.time()-t0
print("Regular pandas took",t1)
df = df.drop('col4',axis=1)
#####################################
# Dask Apply
#####################################
t0 = time.time()
ddf['col4'] = ddf.map_partitions(lambda d: d.apply(
lambda x: check_dist(x.col1,x.col2,x.col3),axis=1)
).compute()
t1 = time.time()-t0
print("Dask pandas took",t1)
ddf = ddf.drop('col4',axis=1)
#####################################
# Numba Pandas
#####################################
t0 = time.time()
df['col4'] = check_dist_fast(df.col1.to_numpy(),df.col2.to_numpy(),df.col3.to_numpy())
t1 = time.time()-t0
print("Numba pandas took",t1)
df = df.drop('col4',axis=1)
#####################################
# Numba + Jit Pandas
#####################################
t0 = time.time()
t0 = time.time()
ddf['col4'] = ddf.map_partitions(lambda d: d.apply(lambda x:
check_dist_fast(x.col1,x.col2,x.col3),axis=1)).compute()
t1 = time.time()-t0
print("Numba Dask pandas took",t1)
ddf = ddf.drop('col4',axis=1)
最后,您应该了解哪些其他最佳实践。这个想法是将其发送到某种具有许多节点的集群。
时间是:
- 常规 pandas 花了 150.6191689968109
- Dask pandas 花费了 153.70575094223022
- Numba pandas 花费了 0.710655927658081
- Numba Dask pandas 花费了 139.57402181625366
我认为 dask 太慢了,因为你正在使用以下方法计算一个系列:
ddf.map_partitions(
lambda d: d.apply(lambda x: check_dist(x.col1,x.col2,x.col3), axis=1)
).compute()
然后将其分配给一个新列,这样 dask 就无法并行化该过程。以下代码执行相同的操作,但运行时间为 0.06 秒:
#####################################
# Dask Assign
#####################################
t0 = time.time()
ddf = ddf.assign(col4=lambda x: check_dist(x.col1,x.col2,x.col3))
ddf.compute()
t1 = time.time()-t0
print("Dask using Assign took",t1)
ddf = ddf.drop('col4',axis=1)
我建议您查看 dask 文档中的最佳实践部分。
希望对您有所帮助!