将多个文件的 Dask 包放入带列的 Dask 数据框中
Dask bag from multiple files into Dask dataframe with columns
我得到了一个文件名列表 files
,其中包含逗号分隔的数据,这些数据必须被清理并通过包含基于文件名的信息的列进一步扩展。因此,我实现了一个小的 read_file
函数,它处理初始清理以及附加列的计算。使用 db.from_sequence(files).map(read_file)
,我将读取函数映射到所有文件,每个文件都得到一个字典列表。
但是,我希望我的包中包含输入文件的每一行作为一个条目,而不是字典列表。随后,我想将字典的键映射到 dask 数据框中的列名。
from dask import bag as db
def read_file(filename):
ret = []
with open(filename, 'r') as fp:
... # reading line of file and storing result in dict
ret.append({'a': val_a, 'b': val_b, 'c': val_c})
return ret
from dask import bag as db
files = ['a.txt', 'b.txt', 'c.txt']
my_bag = db.from_sequence(files).map(read_file)
# a,b,c are the keys of the dictionaries returned by read_file
my_df = my_bag.to_dataframe(columns=['a', 'b', 'c'])
有人可以让我知道我必须更改什么才能获得此代码 运行 吗?是否有更合适的不同方法?
编辑:
我创建了三个测试文件 a_20160101.txt
、a_20160102.txt
、a_20160103.txt
。所有这些都只包含几行,每行只有一个字符串。
asdf
sadfsadf
sadf
fsadff
asdf
sadfasd
fa
sf
ads
f
之前我在 read_file
中有一个小错误,但是现在,在映射到 reader 之后调用 my_bag.take(10)
工作正常:
([{'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'asdf', 'c': 'XY'}, {'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'sadfsadf', 'c': 'XY'}, {'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'sadf', 'c': 'XY'}, {'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'fsadff', 'c': 'XY'}, {'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'asdf', 'c': 'XY'}, {'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'sadfasd', 'c': 'XY'}, {'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'fa', 'c': 'XY'}, {'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'sf', 'c': 'XY'}, {'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'ads', 'c': 'XY'}, {'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'f', 'c': 'XY'}],)
然而 my_df = my_bag.to_dataframe(columns=['a', 'b', 'c'])
随后
my_df.head(10)
仍然加注 dask.async.AssertionError: 3 columns passed, passed data had 10 columns
您可能需要致电 flatten
您的文件名包如下所示:
['a.txt',
'b.txt',
'c.txt']
调用 map 后你的包看起来像这样:
[[{'a': 1, 'b': 2, 'c': 3}, {'a': 10, 'b': 20, 'c': 30}],
[{'a': 1, 'b': 2, 'c': 3}],
[{'a': 1, 'b': 2, 'c': 3}, {'a': 10, 'b': 20, 'c': 30}]]
每个文件都变成了字典列表。现在你的包有点像字典列表。
.to_dataframe
方法希望您有一个字典列表。因此,让我们将我们的包连接成一个扁平化的字典集合
my_bag = db.from_sequence(files).map(read_file).flatten()
[{'a': 1, 'b': 2, 'c': 3}, {'a': 10, 'b': 20, 'c': 30},
{'a': 1, 'b': 2, 'c': 3},
{'a': 1, 'b': 2, 'c': 3}, {'a': 10, 'b': 20, 'c': 30}]
我得到了一个文件名列表 files
,其中包含逗号分隔的数据,这些数据必须被清理并通过包含基于文件名的信息的列进一步扩展。因此,我实现了一个小的 read_file
函数,它处理初始清理以及附加列的计算。使用 db.from_sequence(files).map(read_file)
,我将读取函数映射到所有文件,每个文件都得到一个字典列表。
但是,我希望我的包中包含输入文件的每一行作为一个条目,而不是字典列表。随后,我想将字典的键映射到 dask 数据框中的列名。
from dask import bag as db
def read_file(filename):
ret = []
with open(filename, 'r') as fp:
... # reading line of file and storing result in dict
ret.append({'a': val_a, 'b': val_b, 'c': val_c})
return ret
from dask import bag as db
files = ['a.txt', 'b.txt', 'c.txt']
my_bag = db.from_sequence(files).map(read_file)
# a,b,c are the keys of the dictionaries returned by read_file
my_df = my_bag.to_dataframe(columns=['a', 'b', 'c'])
有人可以让我知道我必须更改什么才能获得此代码 运行 吗?是否有更合适的不同方法?
编辑:
我创建了三个测试文件 a_20160101.txt
、a_20160102.txt
、a_20160103.txt
。所有这些都只包含几行,每行只有一个字符串。
asdf
sadfsadf
sadf
fsadff
asdf
sadfasd
fa
sf
ads
f
之前我在 read_file
中有一个小错误,但是现在,在映射到 reader 之后调用 my_bag.take(10)
工作正常:
([{'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'asdf', 'c': 'XY'}, {'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'sadfsadf', 'c': 'XY'}, {'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'sadf', 'c': 'XY'}, {'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'fsadff', 'c': 'XY'}, {'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'asdf', 'c': 'XY'}, {'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'sadfasd', 'c': 'XY'}, {'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'fa', 'c': 'XY'}, {'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'sf', 'c': 'XY'}, {'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'ads', 'c': 'XY'}, {'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'f', 'c': 'XY'}],)
然而 my_df = my_bag.to_dataframe(columns=['a', 'b', 'c'])
随后
my_df.head(10)
仍然加注 dask.async.AssertionError: 3 columns passed, passed data had 10 columns
您可能需要致电 flatten
您的文件名包如下所示:
['a.txt',
'b.txt',
'c.txt']
调用 map 后你的包看起来像这样:
[[{'a': 1, 'b': 2, 'c': 3}, {'a': 10, 'b': 20, 'c': 30}],
[{'a': 1, 'b': 2, 'c': 3}],
[{'a': 1, 'b': 2, 'c': 3}, {'a': 10, 'b': 20, 'c': 30}]]
每个文件都变成了字典列表。现在你的包有点像字典列表。
.to_dataframe
方法希望您有一个字典列表。因此,让我们将我们的包连接成一个扁平化的字典集合
my_bag = db.from_sequence(files).map(read_file).flatten()
[{'a': 1, 'b': 2, 'c': 3}, {'a': 10, 'b': 20, 'c': 30},
{'a': 1, 'b': 2, 'c': 3},
{'a': 1, 'b': 2, 'c': 3}, {'a': 10, 'b': 20, 'c': 30}]