dask 包不使用所有内核?备择方案?
dask bag not using all cores? alternatives?
我有一个 python 脚本,它执行以下操作:
一世。它采用数据输入文件(通常是嵌套 JSON 格式)
二.将数据逐行传递给另一个函数,该函数将数据操作为所需格式
三.最后它将输出写入文件。
这是我当前执行此操作的简单 python 行...
def manipulate(line):
# a pure python function which transforms the data
# ...
return manipulated_json
for line in f:
components.append(manipulate(ujson.loads(line)))
write_to_csv(components)`
这行得通,但是 python GIL 将其限制为服务器上的一个核心,速度非常慢,尤其是在处理大量数据时。
我通常处理的数据量约为 4 GB gzip 压缩,但偶尔我必须处理数百 GB 的 gzip 压缩数据。它不一定是大数据,但仍然不能全部在内存中处理,并且使用 Python 的 GIL 处理起来非常慢。
在寻找优化数据处理的解决方案时,我遇到了 dask。虽然 PySpark 当时对我来说似乎是显而易见的解决方案,但 dask 的承诺和它的简单性赢得了我的青睐,我决定尝试一下。
在对 dask 及其使用方法进行了大量研究之后,我整理了一个非常小的脚本来复制我当前的过程。脚本如下所示:
import dask.bag as bag
import json
bag.from_filenames('input.json.gz').map(json.loads).map(lambda x:manipulate(x)).concat().to_dataframe().to_csv('output.csv.gz')`
这有效并产生与我原来的非 dask 脚本相同的结果,但它 仍然只在服务器上使用一个 CPU。所以,它根本没有帮助。事实上,它更慢。
我做错了什么?我错过了什么吗?我对 dask 还很陌生,所以如果我忽略了什么或者我是否应该做一些完全不同的事情,请告诉我。
另外,对于我需要做的事情,是否有替代 dask 的方法来使用服务器的全部容量(即所有 CPUs)?
谢谢,
T
这里的问题是 dask.dataframe.to_csv
,它强制您进入单核模式。
我建议使用 dask.bag
进行阅读和操作,然后并行转储到一堆 CSV 文件中。转储到许多 CSV 文件比转储到单个 CSV 文件更容易协调。
import dask.bag as bag
import json
b = bag.from_filenames('input.json.gz').map(json.loads).map(manipulate).concat()
b.map(lambda t: ','.join(map(str, t)).to_textfiles('out.*.csv').compute()
尝试并行读取单个 GZIP 文件也可能存在问题,但以上内容应该可以帮助您入门。
似乎袋子的平行度取决于它们的分区数。
对我来说,运行
mybag=bag.from_filenames(filename, chunkbytes=1e7)
mybag.npartitions
产量
1746
解决了问题并使处理完全可并行化。
如果您提供基于 glob 的文件名,例如MyFiles-*.csv
到 dask dataframe.to_csv()
你应该能够将数据帧输出到磁盘。
它将创建多个文件而不是 1 个大的 csv 文件。 mre https://groups.google.com/a/continuum.io/forum/#!searchin/blaze-dev/to_csv/blaze-dev/NCQfCoOWEcI/S7fwuCfeCgAJ
请参阅此主题
MyFiles-0001.csv
MyFiles-0002.csv
....
我有一个 python 脚本,它执行以下操作: 一世。它采用数据输入文件(通常是嵌套 JSON 格式) 二.将数据逐行传递给另一个函数,该函数将数据操作为所需格式 三.最后它将输出写入文件。
这是我当前执行此操作的简单 python 行...
def manipulate(line):
# a pure python function which transforms the data
# ...
return manipulated_json
for line in f:
components.append(manipulate(ujson.loads(line)))
write_to_csv(components)`
这行得通,但是 python GIL 将其限制为服务器上的一个核心,速度非常慢,尤其是在处理大量数据时。
我通常处理的数据量约为 4 GB gzip 压缩,但偶尔我必须处理数百 GB 的 gzip 压缩数据。它不一定是大数据,但仍然不能全部在内存中处理,并且使用 Python 的 GIL 处理起来非常慢。
在寻找优化数据处理的解决方案时,我遇到了 dask。虽然 PySpark 当时对我来说似乎是显而易见的解决方案,但 dask 的承诺和它的简单性赢得了我的青睐,我决定尝试一下。
在对 dask 及其使用方法进行了大量研究之后,我整理了一个非常小的脚本来复制我当前的过程。脚本如下所示:
import dask.bag as bag
import json
bag.from_filenames('input.json.gz').map(json.loads).map(lambda x:manipulate(x)).concat().to_dataframe().to_csv('output.csv.gz')`
这有效并产生与我原来的非 dask 脚本相同的结果,但它 仍然只在服务器上使用一个 CPU。所以,它根本没有帮助。事实上,它更慢。
我做错了什么?我错过了什么吗?我对 dask 还很陌生,所以如果我忽略了什么或者我是否应该做一些完全不同的事情,请告诉我。
另外,对于我需要做的事情,是否有替代 dask 的方法来使用服务器的全部容量(即所有 CPUs)?
谢谢,
T
这里的问题是 dask.dataframe.to_csv
,它强制您进入单核模式。
我建议使用 dask.bag
进行阅读和操作,然后并行转储到一堆 CSV 文件中。转储到许多 CSV 文件比转储到单个 CSV 文件更容易协调。
import dask.bag as bag
import json
b = bag.from_filenames('input.json.gz').map(json.loads).map(manipulate).concat()
b.map(lambda t: ','.join(map(str, t)).to_textfiles('out.*.csv').compute()
尝试并行读取单个 GZIP 文件也可能存在问题,但以上内容应该可以帮助您入门。
似乎袋子的平行度取决于它们的分区数。
对我来说,运行
mybag=bag.from_filenames(filename, chunkbytes=1e7)
mybag.npartitions
产量
1746
解决了问题并使处理完全可并行化。
如果您提供基于 glob 的文件名,例如MyFiles-*.csv
到 dask dataframe.to_csv()
你应该能够将数据帧输出到磁盘。
它将创建多个文件而不是 1 个大的 csv 文件。 mre https://groups.google.com/a/continuum.io/forum/#!searchin/blaze-dev/to_csv/blaze-dev/NCQfCoOWEcI/S7fwuCfeCgAJ
MyFiles-0001.csv
MyFiles-0002.csv
....