使用lambda函数添加列时的Dask map_partitions meta

Dask map_partitions meta when using lambda function to add column

我正在使用 Dask 应用一个函数 myfunc,该函数将两个新列 new_col_1new_col_2 添加到我的 Dask 数据框 data。此函数使用两列 a1a2 来计算新列。

ddata[['new_col_1', 'new_col_2']] = ddata.map_partitions(
lambda df: df.apply((lambda row: myfunc(row['a1'], row['a2'])), axis=1, 
                    result_type="expand")).compute()  

这会产生以下错误:

ValueError: Metadata inference failed in `lambda`.

You have supplied a custom function and Dask is unable to  determine the type of output that that function returns. 

To resolve this please provide a meta= keyword.

如何为这种情况提供 meta 关键字?

meta 可以通过 kwarg 提供给 .map_partitions:

some_result = dask_df.map_partitions(some_func, meta=expected_df)

expected_df 可以手动指定,或者您可以在一个小数据样本上显式计算它(在这种情况下它将是一个 pandas 数据帧)。

docs 中有更多详细信息。

Sultan 关于使用 meta 的回答是完美的。 :)

你也可以避免在这里使用 map_partitions 因为 Dask 实现了 apply,它在内部调用 map_partitions

import json
import pandas as pd
import dask.dataframe as dd

df = pd.DataFrame({'x': range(1,5),
                   'y': range(6,10),
                  }).astype('str')

ddf = dd.from_pandas(df, npartitions=2)

def myfunc(x):
    s = "string: " + x[0]
    j = json.dumps({'json': x[1]})
    return [s, j]

ddf[['new_col_1', 'new_col_2']] = ddf.apply(myfunc, axis=1, result_type="expand", meta={0: 'object', 1: 'object'})

ddf.compute()

# Output of ddf.compute():
#
#    x  y  new_col_1      new_col_2
# 0  1  6  string: 1  {"json": "6"}
# 1  2  7  string: 2  {"json": "7"}
# 2  3  8  string: 3  {"json": "8"}
# 3  4  9  string: 4  {"json": "9"}

此外,在您的代码片段中,调用 .compute() 将创建一个 pandas DataFrame,因此,如果您将其分配给 Dask DataFrame (ddata).我建议在分配后在 ddata 上调用 compute