dask 包不使用所有内核?备择方案?

dask bag not using all cores? alternatives?

我有一个 python 脚本,它执行以下操作: 一世。它采用数据输入文件(通常是嵌套 JSON 格式) 二.将数据逐行传递给另一个函数,该函数将数据操作为所需格式 三.最后它将输出写入文件。

这是我当前执行此操作的简单 python 行...

def manipulate(line):
    # a pure python function which transforms the data
    # ...
    return manipulated_json

for line in f:
    components.append(manipulate(ujson.loads(line)))
    write_to_csv(components)`

这行得通,但是 python GIL 将其限制为服务器上的一个核心,速度非常慢,尤其是在处理大量数据时。

我通常处理的数据量约为 4 GB gzip 压缩,但偶尔我必须处理数百 GB 的 gzip 压缩数据。它不一定是大数据,但仍然不能全部在内存中处理,并且使用 Python 的 GIL 处理起来非常慢。

在寻找优化数据处理的解决方案时,我遇到了 dask。虽然 PySpark 当时对我来说似乎是显而易见的解决方案,但 dask 的承诺和它的简单性赢得了我的青睐,我决定尝试一下。

在对 dask 及其使用方法进行了大量研究之后,我整理了一个非常小的脚本来复制我当前的过程。脚本如下所示:

import dask.bag as bag
import json
bag.from_filenames('input.json.gz').map(json.loads).map(lambda x:manipulate(x)).concat().to_dataframe().to_csv('output.csv.gz')`

这有效并产生与我原来的非 dask 脚本相同的结果,但它 仍然只在服务器上使用一个 CPU。所以,它根本没有帮助。事实上,它更慢。

我做错了什么?我错过了什么吗?我对 dask 还很陌生,所以如果我忽略了什么或者我是否应该做一些完全不同的事情,请告诉我。

另外,对于我需要做的事情,是否有替代 dask 的方法来使用服务器的全部容量(即所有 CPUs)?

谢谢,

T

这里的问题是 dask.dataframe.to_csv,它强制您进入单核模式。

我建议使用 dask.bag 进行阅读和操作,然后并行转储到一堆 CSV 文件中。转储到许多 CSV 文件比转储到单个 CSV 文件更容易协调。

import dask.bag as bag
import json
b = bag.from_filenames('input.json.gz').map(json.loads).map(manipulate).concat()
b.map(lambda t: ','.join(map(str, t)).to_textfiles('out.*.csv').compute()

尝试并行读取单个 GZIP 文件也可能存在问题,但以上内容应该可以帮助您入门。

似乎袋子的平行度取决于它们的分区数。

对我来说,运行

mybag=bag.from_filenames(filename, chunkbytes=1e7)
mybag.npartitions

产量

1746

解决了问题并使处理完全可并行化。

如果您提供基于 glob 的文件名,例如MyFiles-*.csv 到 dask dataframe.to_csv() 你应该能够将数据帧输出到磁盘。 它将创建多个文件而不是 1 个大的 csv 文件。 mre https://groups.google.com/a/continuum.io/forum/#!searchin/blaze-dev/to_csv/blaze-dev/NCQfCoOWEcI/S7fwuCfeCgAJ

请参阅此主题
MyFiles-0001.csv  
MyFiles-0002.csv 
....