在 Dask 中 join/lookup/map 列值的最有效方法?

Most efficient way to join/lookup/map column values in Dask?

给定一个 Dask DataFrame,我试图找到最有效的方法来应用静态值查找。

示例问题:我的数据有一个列 "user_id",其中有四个可能的值 [4823, 1292, 9634, 7431]。我想将这些值映射到 [0, 1, 2, 3] 并将结果存储为新列 "user_id_mapped".

在 Dask 中实现此目的最有效的方法是什么?一种可能性是将主 df 左连接到 lookup_df,但连接是一个相当复杂的操作。即使在普通 Pandas 中,基于索引的解决方案通常也比 join/merge 快得多,例如:

N = 100000
user_ids = [4823, 1292, 9634, 7431]

df = pd.DataFrame({
    "user_id": np.random.choice(user_ids, size=N),
    "dummy": np.random.uniform(size=N),
})

id_lookup_series = pd.Series(data=[0, 1, 2, 3], index=user_ids)

df["user_id_mapped"] = id_lookup_series[df["user_id"]].reset_index(drop=True)

我无法将这种方法转移到 Dask,因为静态 id_lookup_series 是一个普通的 Pandas 系列,而索引 df["user_id"] 是一个 Dask 系列。是否可以在 Dask 中执行这种快速连接?

Pandas 解决方案

如果将 Pandas 系列转换为 DataFrame,则可以使用合并

In [1]: import numpy as np

In [2]: import pandas as pd

In [3]: N = 100000

In [4]: user_ids = [4823, 1292, 9634, 7431]

In [5]: df = pd.DataFrame({
   ...:     "user_id": np.random.choice(user_ids, size=N),
   ...:     "dummy": np.random.uniform(size=N),
   ...: })
   ...: 
   ...: id_lookup_series = pd.Series(data=[0, 1, 2, 3], index=user_ids)
   ...: 

In [6]: result = df.merge(id_lookup_series.to_frame(), left_on='user_id', right_
   ...: index=True)

In [7]: result.head()
Out[7]: 
       dummy  user_id  0
0   0.416698     1292  1
1   0.053371     1292  1
6   0.407371     1292  1
14  0.772367     1292  1
18  0.958009     1292  1

Dask Dataframe 解决方案

以上所有内容在 Dask.dataframe 中也能正常工作。我不确定您是否提前知道用户 ID,所以我添加了一个步骤来计算它们。

In [1]: import numpy as np

In [2]: import pandas as pd
N 
In [3]: N = 100000

In [4]: user_ids = [4823, 1292, 9634, 7431]

In [5]: df = pd.DataFrame({
   ...:     "user_id": np.random.choice(user_ids, size=N),
   ...:     "dummy": np.random.uniform(size=N),
   ...: })

In [6]: import dask.dataframe as dd

In [7]: ddf = dd.from_pandas(df, npartitions=10)

In [8]: user_ids = ddf.user_id.drop_duplicates().compute()

In [9]: id_lookup_series = pd.Series(list(range(len(user_ids))), index=user_ids.values)

In [10]: result = ddf.merge(id_lookup_series.to_frame(), left_on='user_id', right_index=True)

In [11]: result.head()
Out[11]: 
       dummy  user_id  0
0   0.364693     4823  0
5   0.934778     4823  0
14  0.970289     4823  0
15  0.561710     4823  0
21  0.838962     4823  0

我不确定为什么提供的代码如此复杂。根据我在您的示例问题描述中读到的内容,您需要将一组值替换为另一组值,因此您将 Series.replace(to_replace={}) 方法与 Dask.DataFrame.map_partitions():

结合使用
def replacer(df, to_replace):
    df['user_id_mapped'] = df['user_id'].replace(to_replace=to_replace)
    return df

new_dask_df = dask_df.map_partitions(
    replacer,
    to_replace={4823: 0, 1292: 1, 9634: 2, 7431: 3}
)

P.S。您可能想了解 map_partitionsmeta 参数,并考虑将代码组织成 class 以使其更好并避免闭包,但这是另一个主题。