在 Dask 中 join/lookup/map 列值的最有效方法?
Most efficient way to join/lookup/map column values in Dask?
给定一个 Dask DataFrame,我试图找到最有效的方法来应用静态值查找。
示例问题:我的数据有一个列 "user_id"
,其中有四个可能的值 [4823, 1292, 9634, 7431]
。我想将这些值映射到 [0, 1, 2, 3]
并将结果存储为新列 "user_id_mapped"
.
在 Dask 中实现此目的最有效的方法是什么?一种可能性是将主 df
左连接到 lookup_df
,但连接是一个相当复杂的操作。即使在普通 Pandas 中,基于索引的解决方案通常也比 join/merge 快得多,例如:
N = 100000
user_ids = [4823, 1292, 9634, 7431]
df = pd.DataFrame({
"user_id": np.random.choice(user_ids, size=N),
"dummy": np.random.uniform(size=N),
})
id_lookup_series = pd.Series(data=[0, 1, 2, 3], index=user_ids)
df["user_id_mapped"] = id_lookup_series[df["user_id"]].reset_index(drop=True)
我无法将这种方法转移到 Dask,因为静态 id_lookup_series
是一个普通的 Pandas 系列,而索引 df["user_id"]
是一个 Dask 系列。是否可以在 Dask 中执行这种快速连接?
Pandas 解决方案
如果将 Pandas 系列转换为 DataFrame,则可以使用合并
In [1]: import numpy as np
In [2]: import pandas as pd
In [3]: N = 100000
In [4]: user_ids = [4823, 1292, 9634, 7431]
In [5]: df = pd.DataFrame({
...: "user_id": np.random.choice(user_ids, size=N),
...: "dummy": np.random.uniform(size=N),
...: })
...:
...: id_lookup_series = pd.Series(data=[0, 1, 2, 3], index=user_ids)
...:
In [6]: result = df.merge(id_lookup_series.to_frame(), left_on='user_id', right_
...: index=True)
In [7]: result.head()
Out[7]:
dummy user_id 0
0 0.416698 1292 1
1 0.053371 1292 1
6 0.407371 1292 1
14 0.772367 1292 1
18 0.958009 1292 1
Dask Dataframe 解决方案
以上所有内容在 Dask.dataframe 中也能正常工作。我不确定您是否提前知道用户 ID,所以我添加了一个步骤来计算它们。
In [1]: import numpy as np
In [2]: import pandas as pd
N
In [3]: N = 100000
In [4]: user_ids = [4823, 1292, 9634, 7431]
In [5]: df = pd.DataFrame({
...: "user_id": np.random.choice(user_ids, size=N),
...: "dummy": np.random.uniform(size=N),
...: })
In [6]: import dask.dataframe as dd
In [7]: ddf = dd.from_pandas(df, npartitions=10)
In [8]: user_ids = ddf.user_id.drop_duplicates().compute()
In [9]: id_lookup_series = pd.Series(list(range(len(user_ids))), index=user_ids.values)
In [10]: result = ddf.merge(id_lookup_series.to_frame(), left_on='user_id', right_index=True)
In [11]: result.head()
Out[11]:
dummy user_id 0
0 0.364693 4823 0
5 0.934778 4823 0
14 0.970289 4823 0
15 0.561710 4823 0
21 0.838962 4823 0
我不确定为什么提供的代码如此复杂。根据我在您的示例问题描述中读到的内容,您需要将一组值替换为另一组值,因此您将 Series.replace(to_replace={})
方法与 Dask.DataFrame.map_partitions()
:
结合使用
def replacer(df, to_replace):
df['user_id_mapped'] = df['user_id'].replace(to_replace=to_replace)
return df
new_dask_df = dask_df.map_partitions(
replacer,
to_replace={4823: 0, 1292: 1, 9634: 2, 7431: 3}
)
P.S。您可能想了解 map_partitions
的 meta
参数,并考虑将代码组织成 class 以使其更好并避免闭包,但这是另一个主题。
给定一个 Dask DataFrame,我试图找到最有效的方法来应用静态值查找。
示例问题:我的数据有一个列 "user_id"
,其中有四个可能的值 [4823, 1292, 9634, 7431]
。我想将这些值映射到 [0, 1, 2, 3]
并将结果存储为新列 "user_id_mapped"
.
在 Dask 中实现此目的最有效的方法是什么?一种可能性是将主 df
左连接到 lookup_df
,但连接是一个相当复杂的操作。即使在普通 Pandas 中,基于索引的解决方案通常也比 join/merge 快得多,例如:
N = 100000
user_ids = [4823, 1292, 9634, 7431]
df = pd.DataFrame({
"user_id": np.random.choice(user_ids, size=N),
"dummy": np.random.uniform(size=N),
})
id_lookup_series = pd.Series(data=[0, 1, 2, 3], index=user_ids)
df["user_id_mapped"] = id_lookup_series[df["user_id"]].reset_index(drop=True)
我无法将这种方法转移到 Dask,因为静态 id_lookup_series
是一个普通的 Pandas 系列,而索引 df["user_id"]
是一个 Dask 系列。是否可以在 Dask 中执行这种快速连接?
Pandas 解决方案
如果将 Pandas 系列转换为 DataFrame,则可以使用合并
In [1]: import numpy as np
In [2]: import pandas as pd
In [3]: N = 100000
In [4]: user_ids = [4823, 1292, 9634, 7431]
In [5]: df = pd.DataFrame({
...: "user_id": np.random.choice(user_ids, size=N),
...: "dummy": np.random.uniform(size=N),
...: })
...:
...: id_lookup_series = pd.Series(data=[0, 1, 2, 3], index=user_ids)
...:
In [6]: result = df.merge(id_lookup_series.to_frame(), left_on='user_id', right_
...: index=True)
In [7]: result.head()
Out[7]:
dummy user_id 0
0 0.416698 1292 1
1 0.053371 1292 1
6 0.407371 1292 1
14 0.772367 1292 1
18 0.958009 1292 1
Dask Dataframe 解决方案
以上所有内容在 Dask.dataframe 中也能正常工作。我不确定您是否提前知道用户 ID,所以我添加了一个步骤来计算它们。
In [1]: import numpy as np
In [2]: import pandas as pd
N
In [3]: N = 100000
In [4]: user_ids = [4823, 1292, 9634, 7431]
In [5]: df = pd.DataFrame({
...: "user_id": np.random.choice(user_ids, size=N),
...: "dummy": np.random.uniform(size=N),
...: })
In [6]: import dask.dataframe as dd
In [7]: ddf = dd.from_pandas(df, npartitions=10)
In [8]: user_ids = ddf.user_id.drop_duplicates().compute()
In [9]: id_lookup_series = pd.Series(list(range(len(user_ids))), index=user_ids.values)
In [10]: result = ddf.merge(id_lookup_series.to_frame(), left_on='user_id', right_index=True)
In [11]: result.head()
Out[11]:
dummy user_id 0
0 0.364693 4823 0
5 0.934778 4823 0
14 0.970289 4823 0
15 0.561710 4823 0
21 0.838962 4823 0
我不确定为什么提供的代码如此复杂。根据我在您的示例问题描述中读到的内容,您需要将一组值替换为另一组值,因此您将 Series.replace(to_replace={})
方法与 Dask.DataFrame.map_partitions()
:
def replacer(df, to_replace):
df['user_id_mapped'] = df['user_id'].replace(to_replace=to_replace)
return df
new_dask_df = dask_df.map_partitions(
replacer,
to_replace={4823: 0, 1292: 1, 9634: 2, 7431: 3}
)
P.S。您可能想了解 map_partitions
的 meta
参数,并考虑将代码组织成 class 以使其更好并避免闭包,但这是另一个主题。