Return Dask 中的结构化行应用

Return a structured row in Dask apply

我正在对 Dask 数据帧 中的所有行应用一个函数。在 PySpark 中,我能够 return 具有命名参数的 spark.sql.Row 对象,以便为结果 DataFrame 创建结构化行。在 dask 数据帧中的行上应用函数时,如何 return 结构相似的行(具有列和类型)?

我正在寻找类似的东西:

# df is a dask.dataframe with a JSON blob in the `data` column

def process(row):
    json_data = json.loads(row.data)
    return Row(a=json_data["a"], b=json_data["b")

result = df.apply(
    process,
    axis=1,
).compute()

result

我看到这些行本身就是 pd.Series,所以我尝试 process returning Series 但我得到了这个错误

AttributeError: 'Series' object has no attribute 'columns'

documentation 建议我可以在 apply:

中使用 meta 参数

meta: An empty pd.DataFrame or pd.Series that matches the dtypes and column names of the output ... [Inputs like] iterable of (name, dtype) can be provided (note that the order of the names should match the order of the columns)

然而,当我按照建议使用 iterable 元元组时

result = df.apply(
    process,
    axis=1,
    meta=[("a", "int")]
).compute()

它需要一个 DataFrame 对象并且 return 这个错误

AttributeError: 'DataFrame' object has no attribute 'name'

这是围绕 pandas 函数开发的 dask 包装器 here:

# see unutbu's answer here: 
import json
def json_to_series(text):
    keys, values = zip(*[item for dct in json.loads(text) for item in dct.items()])
    return pd.Series(values, index=keys)


def process_chunk(df):
    _tmp = df['data'].apply(json_to_series)
    return pd.concat([df, _tmp], axis=1)

result = df.map_partitions(process_chunk).compute()