使用 dask 将单个 16M 行 csv 并行转换为 Parquet
parallelize conversion of a single 16M row csv to Parquet with dask
以下操作有效,但耗时将近 2 小时:
from dask import dataframe as ddf
ddf.read_csv('data.csv').to_parquet('data.pq')
有没有办法并行化它?
文件 data.csv
约为 2G 未压缩,包含 1600 万行 x 22 列。
不知道是不是数据的问题。我在我的机器上做了一个玩具示例,同样的命令需要大约 9 秒
import dask.dataframe as dd
import numpy as np
import pandas as pd
from dask.distributed import Client
import dask
client = Client()
# if you wish to connect to the dashboard
client
# fake df size ~2.1 GB
# takes ~180 seconds
N = int(5e6)
df = pd.DataFrame({i: np.random.rand(N)
for i in range(22)})
df.to_csv("data.csv", index=False)
# the following takes ~9 seconds on my machine
dd.read_csv("data.csv").to_parquet("data_pq")
使用 repartition
方法并行写出多个 CSV 文件也很容易:
df = dd.read_csv('data.csv')
df = df.repartition(npartitions=20)
df.to_parquet('./data_pq', write_index=False, compression='snappy')
Dask 喜欢使用大约 100 MB 的分区,因此 20 个分区应该适合 2GB 的数据集。
您还可以通过在读取 CSV 之前拆分 CSV 来加快速度,这样 Dask 就可以并行读取 CSV 文件。您可以使用 these tactics 将 2GB CSV 分成 20 个不同的 CSV,然后在不重新分区的情况下将它们写出:
df = dd.read_csv('folder_with_small_csvs/*.csv')
df.to_parquet('./data_pq', write_index=False, compression='snappy')
read_csv
有一个 blocksize
参数 (docs),您可以使用它来控制生成的分区的大小,从而控制分区的数量。据我所知,这将导致并行读取分区——每个工作人员将读取相关偏移量的块大小。
您可以设置 blocksize
以产生所需数量的分区以利用您拥有的内核。例如
cores = 8
size = os.path.getsize('data.csv')
ddf = dd.read_csv("data.csv", blocksize=np.rint(size/cores))
print(ddf.npartitions)
将输出:
8
更好的是,您可以尝试修改大小,以便生成的镶木地板具有推荐大小的分区(我在不同的地方看到了不同的数字:-|)。
以下操作有效,但耗时将近 2 小时:
from dask import dataframe as ddf
ddf.read_csv('data.csv').to_parquet('data.pq')
有没有办法并行化它?
文件 data.csv
约为 2G 未压缩,包含 1600 万行 x 22 列。
不知道是不是数据的问题。我在我的机器上做了一个玩具示例,同样的命令需要大约 9 秒
import dask.dataframe as dd
import numpy as np
import pandas as pd
from dask.distributed import Client
import dask
client = Client()
# if you wish to connect to the dashboard
client
# fake df size ~2.1 GB
# takes ~180 seconds
N = int(5e6)
df = pd.DataFrame({i: np.random.rand(N)
for i in range(22)})
df.to_csv("data.csv", index=False)
# the following takes ~9 seconds on my machine
dd.read_csv("data.csv").to_parquet("data_pq")
使用 repartition
方法并行写出多个 CSV 文件也很容易:
df = dd.read_csv('data.csv')
df = df.repartition(npartitions=20)
df.to_parquet('./data_pq', write_index=False, compression='snappy')
Dask 喜欢使用大约 100 MB 的分区,因此 20 个分区应该适合 2GB 的数据集。
您还可以通过在读取 CSV 之前拆分 CSV 来加快速度,这样 Dask 就可以并行读取 CSV 文件。您可以使用 these tactics 将 2GB CSV 分成 20 个不同的 CSV,然后在不重新分区的情况下将它们写出:
df = dd.read_csv('folder_with_small_csvs/*.csv')
df.to_parquet('./data_pq', write_index=False, compression='snappy')
read_csv
有一个 blocksize
参数 (docs),您可以使用它来控制生成的分区的大小,从而控制分区的数量。据我所知,这将导致并行读取分区——每个工作人员将读取相关偏移量的块大小。
您可以设置 blocksize
以产生所需数量的分区以利用您拥有的内核。例如
cores = 8
size = os.path.getsize('data.csv')
ddf = dd.read_csv("data.csv", blocksize=np.rint(size/cores))
print(ddf.npartitions)
将输出:
8
更好的是,您可以尝试修改大小,以便生成的镶木地板具有推荐大小的分区(我在不同的地方看到了不同的数字:-|)。