使用 dask.delayed 和 pandas.DataFrame 将 dask.bag 个词典转换为 dask.dataframe
convert dask.bag of dictionaries to dask.dataframe using dask.delayed and pandas.DataFrame
我正在努力将 dask.bag
词典转换为 dask.delayed
pandas.DataFrames
最终 dask.dataframe
我有一个函数 (make_dict) 将文件读入一个相当复杂的嵌套字典结构,另一个函数 (make_df) 将这些字典转换为 pandas.DataFrame
(结果数据帧每个文件大约 100 MB)。我想将所有数据帧附加到一个 dask.dataframe
中以供进一步分析。
到目前为止,我一直在使用 dask.delayed
对象来加载、转换和附加所有工作正常的数据(请参见下面的示例)。但是对于未来的工作,我想使用 dask.persist()
.
将加载的词典存储在 dask.bag
中
我设法将数据加载到 dask.bag
,生成了一个字典列表或 pandas.DataFrame
列表,我可以在调用 compute()
后在本地使用它们。然而,当我尝试使用 to_delayed()
将 dask.bag
变成 dask.dataframe
时,我遇到了一个错误(见下文)。
感觉好像我在这里遗漏了一些相当简单的东西,或者我对 dask.bag
的处理方法是错误的?
下面的例子展示了我使用简化函数的方法并抛出了同样的错误。任何有关如何解决此问题的建议都将受到赞赏。
import numpy as np
import pandas as pd
import dask
import dask.dataframe
import dask.bag
print(dask.__version__) # 1.1.4
print(pd.__version__) # 0.24.2
def make_dict(n=1):
return {"name":"dictionary","data":{'A':np.arange(n),'B':np.arange(n)}}
def make_df(d):
return pd.DataFrame(d['data'])
k = [1,2,3]
# using dask.delayed
dfs = []
for n in k:
delayed_1 = dask.delayed(make_dict)(n)
delayed_2 = dask.delayed(make_df)(delayed_1)
dfs.append(delayed_2)
ddf1 = dask.dataframe.from_delayed(dfs).compute() # this works as expected
# using dask.bag and turning bag of dicts into bag of DataFrames
b1 = dask.bag.from_sequence(k).map(make_dict)
b2 = b1.map(make_df)
df = pd.DataFrame().append(b2.compute()) # <- I would like to do this using delayed dask.DataFrames like above
ddf2 = dask.dataframe.from_delayed(b2.to_delayed()).compute() # <- this fails
# error:
# ValueError: Expected iterable of tuples of (name, dtype), got [ A B
# 0 0 0]
我最终想使用分布式调度程序做什么:
b = dask.bag.from_sequence(k).map(make_dict)
b = b.persist()
ddf = dask.dataframe.from_delayed(b.map(make_df).to_delayed())
在 bag case 中,延迟对象指向元素列表,因此您有一个 pandas 数据帧列表列表,这不是您想要的。两条推荐
- 坚持 dask.delayed。它似乎很适合你
- 使用 Bag.to_dataframe 方法,它需要一袋字典,并自己进行数据帧转换
我正在努力将 dask.bag
词典转换为 dask.delayed
pandas.DataFrames
最终 dask.dataframe
我有一个函数 (make_dict) 将文件读入一个相当复杂的嵌套字典结构,另一个函数 (make_df) 将这些字典转换为 pandas.DataFrame
(结果数据帧每个文件大约 100 MB)。我想将所有数据帧附加到一个 dask.dataframe
中以供进一步分析。
到目前为止,我一直在使用 dask.delayed
对象来加载、转换和附加所有工作正常的数据(请参见下面的示例)。但是对于未来的工作,我想使用 dask.persist()
.
dask.bag
中
我设法将数据加载到 dask.bag
,生成了一个字典列表或 pandas.DataFrame
列表,我可以在调用 compute()
后在本地使用它们。然而,当我尝试使用 to_delayed()
将 dask.bag
变成 dask.dataframe
时,我遇到了一个错误(见下文)。
感觉好像我在这里遗漏了一些相当简单的东西,或者我对 dask.bag
的处理方法是错误的?
下面的例子展示了我使用简化函数的方法并抛出了同样的错误。任何有关如何解决此问题的建议都将受到赞赏。
import numpy as np
import pandas as pd
import dask
import dask.dataframe
import dask.bag
print(dask.__version__) # 1.1.4
print(pd.__version__) # 0.24.2
def make_dict(n=1):
return {"name":"dictionary","data":{'A':np.arange(n),'B':np.arange(n)}}
def make_df(d):
return pd.DataFrame(d['data'])
k = [1,2,3]
# using dask.delayed
dfs = []
for n in k:
delayed_1 = dask.delayed(make_dict)(n)
delayed_2 = dask.delayed(make_df)(delayed_1)
dfs.append(delayed_2)
ddf1 = dask.dataframe.from_delayed(dfs).compute() # this works as expected
# using dask.bag and turning bag of dicts into bag of DataFrames
b1 = dask.bag.from_sequence(k).map(make_dict)
b2 = b1.map(make_df)
df = pd.DataFrame().append(b2.compute()) # <- I would like to do this using delayed dask.DataFrames like above
ddf2 = dask.dataframe.from_delayed(b2.to_delayed()).compute() # <- this fails
# error:
# ValueError: Expected iterable of tuples of (name, dtype), got [ A B
# 0 0 0]
我最终想使用分布式调度程序做什么:
b = dask.bag.from_sequence(k).map(make_dict)
b = b.persist()
ddf = dask.dataframe.from_delayed(b.map(make_df).to_delayed())
在 bag case 中,延迟对象指向元素列表,因此您有一个 pandas 数据帧列表列表,这不是您想要的。两条推荐
- 坚持 dask.delayed。它似乎很适合你
- 使用 Bag.to_dataframe 方法,它需要一袋字典,并自己进行数据帧转换