对数据框进行 Dask 计算以添加列 returns AttributeError
Dask compute on dataframe to add column returns AttributeError
我有一个函数可以使用一个函数向 DataFrame 添加一列,例如
def myfunc(x):
resp_data = {'status': '1', 'data': x}
return json.dumps(resp_data)
原来的Pandasdataframedf
转换成Dask
DataFrame如下
import dask.dataframe as dd
ddf = dd.from_pandas(df, npartitions=30)
现在我在 ddf
上调用函数 myfunc
以使用现有列 att
添加新列 data_json
,如下所示
ddf['data_json'] = ddf.apply(lambda row:myfunc(row['att']),
axis=1, result_type='expand', meta=(None, 'str'))
当我调用 ddf.compute()
时,它因这个错误而中断
AttributeError: 'Series' object has no attribute 'columns'
使用
调用compute()
后,我需要将ddf
保存到一个文件中
ddf.to_csv("myfile.csv", index=False, single_file=True)
如何处理错误以跳过生成此错误的那些行并继续处理和保存 Dask 数据帧?
几点建议:
如果你的函数很简单,那么就没有必要将系列作为参数传递,所以像 ddf.apply(myfunc, axis=1)
这样的东西应该可以工作。如果函数有多个参数,那么函数的内容应该指定如何处理多个列。
原来 json
不喜欢 numpy
dtypes,所以在转储之前需要使用 int
.
转换值
如果dataframe被保存为csv,那么之前就不需要.compute
它了,因为它需要做两次相同的工作。
如果myfunc
不依赖于相邻的行,也可以使用.map_partitions
.
import json
import dask.dataframe as dd
import pandas as pd
ddf = dd.from_pandas(pd.DataFrame(range(5), columns=["x"]), npartitions=2)
ddf["y"] = 2 * ddf["x"]
def myfunc(row):
"""content of the function should specify how to handle different columns"""
resp_data = {
"status": "1",
"y": int(row["y"]),
"diff_data": int(row["y"] - row["x"]),
}
return json.dumps(resp_data)
ddf["data_json"] = ddf.apply(myfunc, axis=1, result_type="expand", meta=(None, "str"))
print(ddf.compute())
# x y data_json
# 0 0 0 {"status": "1", "y": 0, "diff_data": 0}
# 1 1 2 {"status": "1", "y": 2, "diff_data": 1}
# 2 2 4 {"status": "1", "y": 4, "diff_data": 2}
# 3 3 6 {"status": "1", "y": 6, "diff_data": 3}
# 4 4 8 {"status": "1", "y": 8, "diff_data": 4}
# if the dataframe only needs to be saved, there is no need for separate .compute
# ddf.to_csv("myfile.csv", index=False, single_file=True)
我有一个函数可以使用一个函数向 DataFrame 添加一列,例如
def myfunc(x):
resp_data = {'status': '1', 'data': x}
return json.dumps(resp_data)
原来的Pandasdataframedf
转换成Dask
DataFrame如下
import dask.dataframe as dd
ddf = dd.from_pandas(df, npartitions=30)
现在我在 ddf
上调用函数 myfunc
以使用现有列 att
添加新列 data_json
,如下所示
ddf['data_json'] = ddf.apply(lambda row:myfunc(row['att']),
axis=1, result_type='expand', meta=(None, 'str'))
当我调用 ddf.compute()
时,它因这个错误而中断
AttributeError: 'Series' object has no attribute 'columns'
使用
调用compute()
后,我需要将ddf
保存到一个文件中
ddf.to_csv("myfile.csv", index=False, single_file=True)
如何处理错误以跳过生成此错误的那些行并继续处理和保存 Dask 数据帧?
几点建议:
如果你的函数很简单,那么就没有必要将系列作为参数传递,所以像
ddf.apply(myfunc, axis=1)
这样的东西应该可以工作。如果函数有多个参数,那么函数的内容应该指定如何处理多个列。原来
转换值json
不喜欢numpy
dtypes,所以在转储之前需要使用int
.如果dataframe被保存为csv,那么之前就不需要
.compute
它了,因为它需要做两次相同的工作。如果
myfunc
不依赖于相邻的行,也可以使用.map_partitions
.
import json
import dask.dataframe as dd
import pandas as pd
ddf = dd.from_pandas(pd.DataFrame(range(5), columns=["x"]), npartitions=2)
ddf["y"] = 2 * ddf["x"]
def myfunc(row):
"""content of the function should specify how to handle different columns"""
resp_data = {
"status": "1",
"y": int(row["y"]),
"diff_data": int(row["y"] - row["x"]),
}
return json.dumps(resp_data)
ddf["data_json"] = ddf.apply(myfunc, axis=1, result_type="expand", meta=(None, "str"))
print(ddf.compute())
# x y data_json
# 0 0 0 {"status": "1", "y": 0, "diff_data": 0}
# 1 1 2 {"status": "1", "y": 2, "diff_data": 1}
# 2 2 4 {"status": "1", "y": 4, "diff_data": 2}
# 3 3 6 {"status": "1", "y": 6, "diff_data": 3}
# 4 4 8 {"status": "1", "y": 8, "diff_data": 4}
# if the dataframe only needs to be saved, there is no need for separate .compute
# ddf.to_csv("myfile.csv", index=False, single_file=True)