计算 pandas 数据帧的子集时,dask 广播变量因密钥错误而失败
dask broadcast variable fails with key error when calculating subset of pandas dataframe
我有一个 pandas 数据框,想对每个组应用一个代价高昂的操作。因此,我想使用 dask 并行化这个任务。
应广播初始数据帧。
但计算仅失败:
<Future: error, key: iterated_costly_function-4aff5e66b6af1c073dc2cfd0d2dbb6f3>
<Future: error, key: iterated_costly_function-74d26e42c758a8cc177047d7a0f49ff4>
代码如下:
import pandas as pd
df = pd.DataFrame({'foo':[1,2,3,4,5,6], 'bar':['a', 'a', 'b', 'b', 'a', 'b']})
display(df)
unique_values = df.bar.unique()
print(unique_values)
for v in unique_values:
subset_df = df[df.bar == v]
display(subset_df)
现在使用 dask 时:
import pandas as pd
from tqdm import tqdm
tqdm.pandas()
from time import sleep
from dask.distributed import Client, progress
from dask.distributed import wait, as_completed
from dask.distributed import Variable
from dask import delayed
#
client = Client()#threads_per_worker=8, n_workers=2)
client
remote_df = client.scatter(df, broadcast=True)
global_var = Variable(name="remote_data")
global_var.set(remote_df)
def iterated_costly_function(v):
df = global_var.get()
subset_df = df[df.bar == v]
#subset_df = apply_some_costly_function(subset_df, x=1, y=2, z=3)
# not implemented here for sake of simplicity
sleep(3)
return subset_df#.values # make it return something
futures = client.map(iterated_costly_function, unique_values)
wait(futures)
for f in tqdm(futures):
print(f)
我尝试访问广播变量的方式有什么问题?
我会这样写你的函数
def iterated_costly_function(v):
df = Variable(name="remote_data").get().result()
subset_df = df[df.bar == v]
sleep(3)
return subset_df#.values
哪里
- 我们使用其名称显式实例化
Variable
,而不是将其传递到闭包中(您可以将名称字符串作为参数传递)
- 因为数据实际上是一个未来,你需要
.result()
才能得到它的价值。
我有一个 pandas 数据框,想对每个组应用一个代价高昂的操作。因此,我想使用 dask 并行化这个任务。 应广播初始数据帧。 但计算仅失败:
<Future: error, key: iterated_costly_function-4aff5e66b6af1c073dc2cfd0d2dbb6f3>
<Future: error, key: iterated_costly_function-74d26e42c758a8cc177047d7a0f49ff4>
代码如下:
import pandas as pd
df = pd.DataFrame({'foo':[1,2,3,4,5,6], 'bar':['a', 'a', 'b', 'b', 'a', 'b']})
display(df)
unique_values = df.bar.unique()
print(unique_values)
for v in unique_values:
subset_df = df[df.bar == v]
display(subset_df)
现在使用 dask 时:
import pandas as pd
from tqdm import tqdm
tqdm.pandas()
from time import sleep
from dask.distributed import Client, progress
from dask.distributed import wait, as_completed
from dask.distributed import Variable
from dask import delayed
#
client = Client()#threads_per_worker=8, n_workers=2)
client
remote_df = client.scatter(df, broadcast=True)
global_var = Variable(name="remote_data")
global_var.set(remote_df)
def iterated_costly_function(v):
df = global_var.get()
subset_df = df[df.bar == v]
#subset_df = apply_some_costly_function(subset_df, x=1, y=2, z=3)
# not implemented here for sake of simplicity
sleep(3)
return subset_df#.values # make it return something
futures = client.map(iterated_costly_function, unique_values)
wait(futures)
for f in tqdm(futures):
print(f)
我尝试访问广播变量的方式有什么问题?
我会这样写你的函数
def iterated_costly_function(v):
df = Variable(name="remote_data").get().result()
subset_df = df[df.bar == v]
sleep(3)
return subset_df#.values
哪里
- 我们使用其名称显式实例化
Variable
,而不是将其传递到闭包中(您可以将名称字符串作为参数传递) - 因为数据实际上是一个未来,你需要
.result()
才能得到它的价值。