按值范围重新划分大型镶木地板数据集

Repartition large parquet dataset by ranges of values

我有一个很大的 .parquet 数据集,它被分成 ~256k 块 (20GB)。最近我将它重新打包成 514 个块 (28GB) 以减少文件数量。

我真正需要的是根据包含 int32 值范围从 0 到 99.999.999(大约 200k 不同值)的字段加载数据。

我尝试了一个示例 Writing large amounts of data,但是 pyspark 5 不允许写入这么多分区并引发错误 pyarrow.lib.ArrowInvalid: Fragment would be written into 203094 partitions. This exceeds the maximum of 1024

是否有可能根据提到的字段对数据集进行重新分区,以便每个块都包含值的范围?例如分区 1 (0-99999),分区 2 (100000-199000),...

max_partitions 是可配置的 (pyarrow >= 4.0.0)。您可能会开始 运行 到 ARROW-12321 因为 pyarrow 将为每个分区打开一个文件描述符,并且在收到所有数据之前不会关闭它。然后,您可以修改系统上的最大文件描述符来解决这个问题。

您对分区列进行分组的想法也很不错。这应该会减少您拥有的文件数量(使事情更易于管理),甚至可能会提高性能(即使每个文件将包含更多数据)。不幸的是,这还没有准备好轻松实施。 Arrow 的投影机制是您想要的,但 pyarrow 的数据集表达式并未完全连接到 pyarrow 计算函数(ARROW-12060)。

有一种稍微冗长但更灵活的方法可用。您可以扫描 python 中的批次,应用您想要的任何转换,然后将其公开为数据集编写器将接受的批次迭代器:

import pyarrow as pa
import pyarrow.dataset as ds
import pyarrow.compute as pc
import pyarrow.parquet as pq

table = pa.Table.from_pydict({'x': range(20), 'y': [1] * 20})
pq.write_table(table, '/tmp/foo.parquet')
part = pa.dataset.partitioning(pa.schema([("partition_key", pa.int64())]), flavor='hive')
dataset = pa.dataset.dataset('/tmp/foo.parquet')

scanner = dataset.scanner()
scanner_iter = scanner.scan_batches()

# Arrow doesn't have modulo / integer division yet but we can
# approximate it with masking (ARROW-12755).
# There will be 2^3 items per group.  Adjust items_per_group_exponent
# to your liking for more items per file.                                                                                                                                                                           
items_per_group_exponent = 3
items_per_group_mask = (2 ** items_per_group_exponent) - 1
mask = ((2 ** 63) - 1) ^ items_per_group_mask
def projector():
    while True:
        try:
            next_batch = next(scanner_iter).record_batch
            partition_key_arr = pc.bit_wise_and(next_batch.column('x'), mask)
            all_arrays = [*next_batch.columns, partition_key_arr]
            all_names = [*next_batch.schema.names, 'partition_key']
            batch_with_part = pa.RecordBatch.from_arrays(all_arrays, names=all_names)
            print(f'Yielding {batch_with_part}')
            yield batch_with_part
        except StopIteration:
            return

full_schema = dataset.schema.append(pa.field('partition_key', pa.int64()))
ds.write_dataset(projector(), '/tmp/new_dataset', schema=full_schema, format='parquet', partitioning=part)