如何在 numpy/pandas 中处理来自 spark 的大型镶木地板文件?
How can I process a large parquet file from spark in numpy/pandas?
我为 pandas、numpy 和 spark 标签发布这个,因为我不确定在这三个系统中解决这个问题的最佳方法。
我有一个很大的 parquet 文件,下游进程无法打开它,因为它超出了系统内存(如果一次打开,内存约为 63gb)。我是这样写文件的:
FULL_MAIN.write.mode("overwrite").parquet(PATH+"/FULL_MAIN.parquet")
但是文件太大,所以我尝试这样做将文件分成更小的夹头:
split_factor = [.1,.1,.1,.1,.1,.1,.1,.1,.1,.1]
FULL_MAIN_RDD1,FULL_MAIN_RDD2,FULL_MAIN_RDD3,FULL_MAIN_RDD4,FULL_MAIN_RDD5, FULL_MAIN_RDD6,FULL_MAIN_RDD7,FULL_MAIN_RDD8,FULL_MAIN_RDD9,FULL_MAIN_RDD10 = FULL_MAIN.randomSplit(split_factor)
FULL_MAIN_RDD1.write.mode("overwrite").parquet(PATH+"/FULL_MAIN_RDD1.parquet")
FULL_MAIN_RDD2.write.mode("overwrite").parquet(PATH+"/FULL_MAIN_RDD2.parquet")
...
这种方法的问题是我需要其他数据帧来保持行对齐,而进行这种随机拆分会使数据帧不对齐。
所以我的两个问题是:
- 当我的数据集中没有任何行号或每一行的数字计数器时,是否可以按相对相等的数量拆分多个数据帧?
- 有没有办法在pandas或numpy中批量读取parquet文件?这将基本上解决我在下游系统上的问题。我无法弄清楚如何批量打开镶木地板(我尝试在 pandas 中打开它,然后拆分行并保存每个文件,但是当我加载数据框时它会使我的系统崩溃)。我不确定是否可以不超出内存。
Parquet 文件格式支持行组。安装 pyarrow
并在创建 parquet 文件时使用 row_groups
:
df.to_parquet("filename.parquet", row_group_size=10000, engine="pyarrow")
然后你可以逐组阅读(甚至只读特定组):
import pyarrow.parquet as pq
pq_file = pq.ParquetFile("filename.parquet")
n_groups = pq_file.num_row_groups
for grp_idx in range(n_groups):
df = pq_file.read_row_group(grp_idx, use_pandas_metadata=True).to_pandas()
process(df)
如果您无法控制 parquet 文件的创建,您仍然只能读取文件的一部分:
pq_file = pq.ParquetFile("filename.parquet")
batch_size = 10000 # records
batches = pq_file.iter_batches(batch_size, use_pandas_metadata=True) # batches will be a generator
for batch in batches:
df = batch.to_pandas()
process(df)
我不确定你是否有火花。如果您想提供下游较小的文件块,您可以使用重新分区到所需数量的块并重写 parquet 文件。
您可以根据需要更改分区号。
df = spark.read.parquet('filename.parquet')
df.repartition(200).mode('overwrite').save('targetPath')
我为 pandas、numpy 和 spark 标签发布这个,因为我不确定在这三个系统中解决这个问题的最佳方法。
我有一个很大的 parquet 文件,下游进程无法打开它,因为它超出了系统内存(如果一次打开,内存约为 63gb)。我是这样写文件的:
FULL_MAIN.write.mode("overwrite").parquet(PATH+"/FULL_MAIN.parquet")
但是文件太大,所以我尝试这样做将文件分成更小的夹头:
split_factor = [.1,.1,.1,.1,.1,.1,.1,.1,.1,.1]
FULL_MAIN_RDD1,FULL_MAIN_RDD2,FULL_MAIN_RDD3,FULL_MAIN_RDD4,FULL_MAIN_RDD5, FULL_MAIN_RDD6,FULL_MAIN_RDD7,FULL_MAIN_RDD8,FULL_MAIN_RDD9,FULL_MAIN_RDD10 = FULL_MAIN.randomSplit(split_factor)
FULL_MAIN_RDD1.write.mode("overwrite").parquet(PATH+"/FULL_MAIN_RDD1.parquet")
FULL_MAIN_RDD2.write.mode("overwrite").parquet(PATH+"/FULL_MAIN_RDD2.parquet")
...
这种方法的问题是我需要其他数据帧来保持行对齐,而进行这种随机拆分会使数据帧不对齐。
所以我的两个问题是:
- 当我的数据集中没有任何行号或每一行的数字计数器时,是否可以按相对相等的数量拆分多个数据帧?
- 有没有办法在pandas或numpy中批量读取parquet文件?这将基本上解决我在下游系统上的问题。我无法弄清楚如何批量打开镶木地板(我尝试在 pandas 中打开它,然后拆分行并保存每个文件,但是当我加载数据框时它会使我的系统崩溃)。我不确定是否可以不超出内存。
Parquet 文件格式支持行组。安装 pyarrow
并在创建 parquet 文件时使用 row_groups
:
df.to_parquet("filename.parquet", row_group_size=10000, engine="pyarrow")
然后你可以逐组阅读(甚至只读特定组):
import pyarrow.parquet as pq
pq_file = pq.ParquetFile("filename.parquet")
n_groups = pq_file.num_row_groups
for grp_idx in range(n_groups):
df = pq_file.read_row_group(grp_idx, use_pandas_metadata=True).to_pandas()
process(df)
如果您无法控制 parquet 文件的创建,您仍然只能读取文件的一部分:
pq_file = pq.ParquetFile("filename.parquet")
batch_size = 10000 # records
batches = pq_file.iter_batches(batch_size, use_pandas_metadata=True) # batches will be a generator
for batch in batches:
df = batch.to_pandas()
process(df)
我不确定你是否有火花。如果您想提供下游较小的文件块,您可以使用重新分区到所需数量的块并重写 parquet 文件。 您可以根据需要更改分区号。
df = spark.read.parquet('filename.parquet')
df.repartition(200).mode('overwrite').save('targetPath')