使用 pyarrow 按大小重新分区 pyarrow 表并写入多个镶木地板文件?
Repartitioning pyarrow tables by size by use of pyarrow and writing into several parquet files?
如标题所述,我想通过使用 pyarrow 并写入多个镶木地板文件,按大小(或行组大小)对 pyarrow table 进行重新分区。
我查看了 pyarrow 文档,并确定了分区数据集章节,这似乎是一个方向。不幸的是,它表明可以按列内容进行分区,但不能按大小(或行组大小)进行分区。
那么,从一个 table 开始,我如何控制写入步骤,以便以受控大小 x MB 写入多个文件? (或行组大小)
import pandas as pd
import numpy as np
import pyarrow as pa
import pyarrow.parquet as pq
file = 'example.parquet'
file_res = 'example_res'
# Generate a random df
df = pd.DataFrame(np.random.randint(100,size=(100000, 20)),columns=['A','B','C','D','E','F','G','H','I','J','K','L','M','N','O','P','Q','R','S','T'])
table = pa.Table.from_pandas(df)
# With this command, I can write a single parquet file that contains 2 row groups.
pq.write_table(table, file, version='2.0', row_group_size=50000)
# I can read it back and try to write it as a partitioned dataset, but a single parquet file is then written.
table_new = pq.ParquetFile(file).read()
pq.write_to_dataset(table_new, file_res)
感谢您的帮助!
最佳,
查看 write_to_dataset and ParquetWriter 的文档,我想不出任何明显的东西。
但您可以为每一行分配一个桶,并根据桶对数据进行分区,例如:
df = (
pd.DataFrame(np.random.randint(100,size=(100000, 20)),columns=['A','B','C','D','E','F','G','H','I','J','K','L','M','N','O','P','Q','R','S','T'])
.assign(bucket=lambda x: x.index // 5000)
)
table = pa.Table.from_pandas(df)
pq.write_to_dataset(table_new, file_res, partition_cols=['bucket'])
你会得到以下文件结构:
bucket=0
bucket=1
bucket=10
bucket=11
bucket=12
bucket=13
bucket=14
bucket=15
bucket=16
bucket=17
bucket=18
bucket=19
bucket=2
bucket=3
bucket=4
bucket=5
bucket=6
bucket=7
bucket=8
bucket=9
这是假设您的 df.index
从零开始并逐一增加(0、1、2、3...)
如标题所述,我想通过使用 pyarrow 并写入多个镶木地板文件,按大小(或行组大小)对 pyarrow table 进行重新分区。
我查看了 pyarrow 文档,并确定了分区数据集章节,这似乎是一个方向。不幸的是,它表明可以按列内容进行分区,但不能按大小(或行组大小)进行分区。
那么,从一个 table 开始,我如何控制写入步骤,以便以受控大小 x MB 写入多个文件? (或行组大小)
import pandas as pd
import numpy as np
import pyarrow as pa
import pyarrow.parquet as pq
file = 'example.parquet'
file_res = 'example_res'
# Generate a random df
df = pd.DataFrame(np.random.randint(100,size=(100000, 20)),columns=['A','B','C','D','E','F','G','H','I','J','K','L','M','N','O','P','Q','R','S','T'])
table = pa.Table.from_pandas(df)
# With this command, I can write a single parquet file that contains 2 row groups.
pq.write_table(table, file, version='2.0', row_group_size=50000)
# I can read it back and try to write it as a partitioned dataset, but a single parquet file is then written.
table_new = pq.ParquetFile(file).read()
pq.write_to_dataset(table_new, file_res)
感谢您的帮助! 最佳,
查看 write_to_dataset and ParquetWriter 的文档,我想不出任何明显的东西。
但您可以为每一行分配一个桶,并根据桶对数据进行分区,例如:
df = (
pd.DataFrame(np.random.randint(100,size=(100000, 20)),columns=['A','B','C','D','E','F','G','H','I','J','K','L','M','N','O','P','Q','R','S','T'])
.assign(bucket=lambda x: x.index // 5000)
)
table = pa.Table.from_pandas(df)
pq.write_to_dataset(table_new, file_res, partition_cols=['bucket'])
你会得到以下文件结构:
bucket=0
bucket=1
bucket=10
bucket=11
bucket=12
bucket=13
bucket=14
bucket=15
bucket=16
bucket=17
bucket=18
bucket=19
bucket=2
bucket=3
bucket=4
bucket=5
bucket=6
bucket=7
bucket=8
bucket=9
这是假设您的 df.index
从零开始并逐一增加(0、1、2、3...)