Return Dask 中的结构化行应用
Return a structured row in Dask apply
我正在对 Dask 数据帧 中的所有行应用一个函数。在 PySpark
中,我能够 return 具有命名参数的 spark.sql.Row
对象,以便为结果 DataFrame
创建结构化行。在 dask 数据帧中的行上应用函数时,如何 return 结构相似的行(具有列和类型)?
我正在寻找类似的东西:
# df is a dask.dataframe with a JSON blob in the `data` column
def process(row):
json_data = json.loads(row.data)
return Row(a=json_data["a"], b=json_data["b")
result = df.apply(
process,
axis=1,
).compute()
result
我看到这些行本身就是 pd.Series
,所以我尝试 process
returning Series
但我得到了这个错误
AttributeError: 'Series' object has no attribute 'columns'
documentation 建议我可以在 apply
:
中使用 meta
参数
meta: An empty pd.DataFrame or pd.Series that matches the dtypes and column names of the output ... [Inputs like] iterable of (name, dtype) can be provided (note that the order of the names should match the order of the columns)
然而,当我按照建议使用 iterable
元元组时
result = df.apply(
process,
axis=1,
meta=[("a", "int")]
).compute()
它需要一个 DataFrame
对象并且 return 这个错误
AttributeError: 'DataFrame' object has no attribute 'name'
这是围绕 pandas 函数开发的 dask 包装器 here:
# see unutbu's answer here:
import json
def json_to_series(text):
keys, values = zip(*[item for dct in json.loads(text) for item in dct.items()])
return pd.Series(values, index=keys)
def process_chunk(df):
_tmp = df['data'].apply(json_to_series)
return pd.concat([df, _tmp], axis=1)
result = df.map_partitions(process_chunk).compute()
我正在对 Dask 数据帧 中的所有行应用一个函数。在 PySpark
中,我能够 return 具有命名参数的 spark.sql.Row
对象,以便为结果 DataFrame
创建结构化行。在 dask 数据帧中的行上应用函数时,如何 return 结构相似的行(具有列和类型)?
我正在寻找类似的东西:
# df is a dask.dataframe with a JSON blob in the `data` column
def process(row):
json_data = json.loads(row.data)
return Row(a=json_data["a"], b=json_data["b")
result = df.apply(
process,
axis=1,
).compute()
result
我看到这些行本身就是 pd.Series
,所以我尝试 process
returning Series
但我得到了这个错误
AttributeError: 'Series' object has no attribute 'columns'
documentation 建议我可以在 apply
:
meta
参数
meta: An empty pd.DataFrame or pd.Series that matches the dtypes and column names of the output ... [Inputs like] iterable of (name, dtype) can be provided (note that the order of the names should match the order of the columns)
然而,当我按照建议使用 iterable
元元组时
result = df.apply(
process,
axis=1,
meta=[("a", "int")]
).compute()
它需要一个 DataFrame
对象并且 return 这个错误
AttributeError: 'DataFrame' object has no attribute 'name'
这是围绕 pandas 函数开发的 dask 包装器 here:
# see unutbu's answer here:
import json
def json_to_series(text):
keys, values = zip(*[item for dct in json.loads(text) for item in dct.items()])
return pd.Series(values, index=keys)
def process_chunk(df):
_tmp = df['data'].apply(json_to_series)
return pd.concat([df, _tmp], axis=1)
result = df.map_partitions(process_chunk).compute()