在大型数据集上 Bag.to_avro 因为 Killed/MemoryError 失败
Failing on Bag.to_avro becase of Killed/MemoryError on a large dataset
我正在尝试处理大量由换行符分隔的文本文件。这些文件是 gzip 压缩的,我将文件分成小块,未压缩的文件约为 100mb 左右。我总共有 296 个单独的压缩文件,未压缩的总大小约为 30Gb。
行是 NQuads,我正在使用 Bag
将行映射为可以导入数据库的格式。行被按键折叠,以便我可以合并与单个页面相关的行。
这是我用来读取文件并折叠它们的代码。
with dask.config.set(num_workers=2):
n_quads_bag = dask.bag.\
read_text(files)
uri_nquads_bag = n_quads_bag.\
map(parser.parse).\
filter(lambda x: x is not None).\
map(nquad_tuple_to_page_dict).\
foldby('uri', binop=binop).\
pluck(1).\
map(lang_extract)
然后我将数据规范化为页面和实体。我通过一个映射函数来做到这一点,该函数将事物拆分为 (page, entities)
的元组。我正在提取数据,然后将其写入 Avro 中的两组独立文件。
pages_entities_bag = uri_nquads_bag.\
map(map_page_entities)
pages_bag = pages_entities_bag.\
pluck(0).\
map(page_extractor).\
map(extract_uri_details).\
map(ntriples_to_dict)
entities_bag = pages_entities_bag.\
pluck(1) .\
flatten().\
map(entity_extractor).\
map(ntriples_to_dict)
with ProgressBar():
pages_bag.to_avro(
os.path.join(output_folder, 'pages.*.avro'),
schema=page_avro_scheme,
codec='snappy',
compute=True)
entities_bag.to_avro(
os.path.join(output_folder, 'entities.*.avro'),
schema=entities_avro_schema,
codec='snappy',
compute=True)
代码在 pages_bag.to_avro(... compute=True)
和 Killed/MemoryError
上失败。我尝试减小分区大小并将处理器数量减少到 2 个。
我设置错了吗compute=True
?这是整个数据集被带入内存的原因吗?如果是这样,我还能如何获取要写入的文件?
或者页面或实体的分区是否对计算机来说太大了?
我的另一个问题是我是否错误地使用了 Bags
?这是解决我要解决的问题的正确方法吗?
我运行这台机器的规格:
- 4 CPU
- 16GB 内存
- 375 暂存盘
避免 运行 内存不足的方法是保持文件 ~100MB 未压缩并使用 groupby
。正如 Dask 文档所述,您可以强制它在磁盘上随机播放。 groupby
支持在输出上设置多个分区。
with dask.config.set(num_workers=2):
n_quads_bag = dask.bag.\
read_text(files)
uri_nquads_bag = n_quads_bag.\
map(parser.parse).\
filter(lambda x: x is not None).\
map(nquad_tuple_to_page_dict).\
groupby(lambda x: x[3], shuffle='disk', npartitions=n_quads_bag.npartitions).\
map(grouped_nquads_to_dict).\
map(lang_extract)
我正在尝试处理大量由换行符分隔的文本文件。这些文件是 gzip 压缩的,我将文件分成小块,未压缩的文件约为 100mb 左右。我总共有 296 个单独的压缩文件,未压缩的总大小约为 30Gb。
行是 NQuads,我正在使用 Bag
将行映射为可以导入数据库的格式。行被按键折叠,以便我可以合并与单个页面相关的行。
这是我用来读取文件并折叠它们的代码。
with dask.config.set(num_workers=2):
n_quads_bag = dask.bag.\
read_text(files)
uri_nquads_bag = n_quads_bag.\
map(parser.parse).\
filter(lambda x: x is not None).\
map(nquad_tuple_to_page_dict).\
foldby('uri', binop=binop).\
pluck(1).\
map(lang_extract)
然后我将数据规范化为页面和实体。我通过一个映射函数来做到这一点,该函数将事物拆分为 (page, entities)
的元组。我正在提取数据,然后将其写入 Avro 中的两组独立文件。
pages_entities_bag = uri_nquads_bag.\
map(map_page_entities)
pages_bag = pages_entities_bag.\
pluck(0).\
map(page_extractor).\
map(extract_uri_details).\
map(ntriples_to_dict)
entities_bag = pages_entities_bag.\
pluck(1) .\
flatten().\
map(entity_extractor).\
map(ntriples_to_dict)
with ProgressBar():
pages_bag.to_avro(
os.path.join(output_folder, 'pages.*.avro'),
schema=page_avro_scheme,
codec='snappy',
compute=True)
entities_bag.to_avro(
os.path.join(output_folder, 'entities.*.avro'),
schema=entities_avro_schema,
codec='snappy',
compute=True)
代码在 pages_bag.to_avro(... compute=True)
和 Killed/MemoryError
上失败。我尝试减小分区大小并将处理器数量减少到 2 个。
我设置错了吗compute=True
?这是整个数据集被带入内存的原因吗?如果是这样,我还能如何获取要写入的文件?
或者页面或实体的分区是否对计算机来说太大了?
我的另一个问题是我是否错误地使用了 Bags
?这是解决我要解决的问题的正确方法吗?
我运行这台机器的规格:
- 4 CPU
- 16GB 内存
- 375 暂存盘
避免 运行 内存不足的方法是保持文件 ~100MB 未压缩并使用 groupby
。正如 Dask 文档所述,您可以强制它在磁盘上随机播放。 groupby
支持在输出上设置多个分区。
with dask.config.set(num_workers=2):
n_quads_bag = dask.bag.\
read_text(files)
uri_nquads_bag = n_quads_bag.\
map(parser.parse).\
filter(lambda x: x is not None).\
map(nquad_tuple_to_page_dict).\
groupby(lambda x: x[3], shuffle='disk', npartitions=n_quads_bag.npartitions).\
map(grouped_nquads_to_dict).\
map(lang_extract)