对数据框进行 Dask 计算以添加列 returns AttributeError

Dask compute on dataframe to add column returns AttributeError

我有一个函数可以使用一个函数向 DataFrame 添加一列,例如

    def myfunc(x):
        resp_data = {'status': '1', 'data': x}
        return json.dumps(resp_data)

原来的Pandasdataframedf转换成DaskDataFrame如下

import dask.dataframe as dd
ddf = dd.from_pandas(df, npartitions=30)

现在我在 ddf 上调用函数 myfunc 以使用现有列 att 添加新列 data_json,如下所示

ddf['data_json'] = ddf.apply(lambda row:myfunc(row['att']), 
                   axis=1, result_type='expand', meta=(None, 'str'))

当我调用 ddf.compute() 时,它因这个错误而中断

AttributeError: 'Series' object has no attribute 'columns'

使用

调用compute()后,我需要将ddf保存到一个文件中
ddf.to_csv("myfile.csv",  index=False, single_file=True)

如何处理错误以跳过生成此错误的那些行并继续处理和保存 Dask 数据帧?

几点建议:

  • 如果你的函数很简单,那么就没有必要将系列作为参数传递,所以像 ddf.apply(myfunc, axis=1) 这样的东西应该可以工作。如果函数有多个参数,那么函数的内容应该指定如何处理多个列。

  • 原来 json 不喜欢 numpy dtypes,所以在转储之前需要使用 int.

    转换值
  • 如果dataframe被保存为csv,那么之前就不需要.compute它了,因为它需要做两次相同的工作。

  • 如果myfunc不依赖于相邻的行,也可以使用.map_partitions.

import json

import dask.dataframe as dd
import pandas as pd

ddf = dd.from_pandas(pd.DataFrame(range(5), columns=["x"]), npartitions=2)
ddf["y"] = 2 * ddf["x"]


def myfunc(row):
    """content of the function should specify how to handle different columns"""
    resp_data = {
        "status": "1",
        "y": int(row["y"]),
        "diff_data": int(row["y"] - row["x"]),
    }
    return json.dumps(resp_data)


ddf["data_json"] = ddf.apply(myfunc, axis=1, result_type="expand", meta=(None, "str"))

print(ddf.compute())
#    x  y                                data_json
# 0  0  0  {"status": "1", "y": 0, "diff_data": 0}
# 1  1  2  {"status": "1", "y": 2, "diff_data": 1}
# 2  2  4  {"status": "1", "y": 4, "diff_data": 2}
# 3  3  6  {"status": "1", "y": 6, "diff_data": 3}
# 4  4  8  {"status": "1", "y": 8, "diff_data": 4}

# if the dataframe only needs to be saved, there is no need for separate .compute
# ddf.to_csv("myfile.csv", index=False, single_file=True)