使用“FileSystemDataset.from_paths”构造pyarrow数据集时,如何指定分区?

When constructing a pyarrow dataset with `FileSystemDataset.from_paths`, how do I specify partitions?

我在 google 存储桶中有一个 CSV 文件列表,组织方式类似于 gs://bucket/some_dir/{partition_value}/filename。我想从这样的 URI 列表中创建一个 pyarrow.Dataset(它是 some_dir 中文件的子集)。

如何执行此操作并将 partition_value 提取为列?

到目前为止我有:

import gcsfs
import pyarrow as pa
import pyarrow.csv
import pyarrow.dataset as ds
from pyarrow.fs import FSSpecHandler, PyFileSystem

fs = gcsfs.GCSFileSystem()
schema = pa.schema([("gene_id", pa.string()), ("raw_count", pa.float32()), ("scaled_estimate", pa.float32())])

# these data are publicly accessible, btw
uris = [
    "gs://gdc-tcga-phs000178-open/0b8b258e-1671-4f86-82e7-59b12ad40d9c/unc.edu.4c243ea9-dfe1-42f0-a887-3c901fb38542.2477720.rsem.genes.results",
    "gs://gdc-tcga-phs000178-open/c8ee8367-c529-4dd6-98b4-fde57991134b/unc.edu.a64ae1f5-a189-4173-be13-903bd7637869.2476757.rsem.genes.results",
    "gs://gdc-tcga-phs000178-open/78354f8d-5ce8-4617-bba4-79614f232e97/unc.edu.ac19f7cf-670b-4dcc-a26b-db0f56377231.2509607.rsem.genes.results",
]

dataset = ds.FileSystemDataset.from_paths(
    uris,
    schema,
    format=ds.CsvFileFormat(parse_options=pa.csv.ParseOptions(delimiter="\t")),
    filesystem=PyFileSystem(FSSpecHandler(fs)),
    # partitions=["bucket", "file_gcs_id"],
    # root_partition="gdc-tcga-phs000178-open",
)

dataset.to_table()

这给了我一个很好的 table 我的架构中的字段。

但是,我希望 partition_key 成为我数据集中的另一个字段。我猜我需要:

  1. 将此作为字段添加到我的架构中,并且
  2. 调用时添加一些东西FileSystemDataset.from_paths

我尝试摆弄 root_partition,但收到一个错误,提示我提供的字符串不是 pyarrow.Expression(不知道那是什么)。我也尝试指定 partitions 但我得到 ValueError: The number of files resulting from paths_or_selector must be equal to the number of partitions.

数据集发现期间 使用文件名信息(连同指定的分区)生成附加到片段的“保证”。例如,当我们看到文件 foo/x=7/bar.parquet 并且我们正在使用“hive 分区”时,我们可以附加保证 x == 7。由于各种原因,我们暂时不需要讨论这些保证作为“表达式”存储。

我想到了两个解决方案。首先,您可以自己创建保证并将它们附加到您的路径(这就是 partitions 参数在 from_paths 方法中表示的内容)。表达式应为 ds.field("column_name") == value.

其次,您可以允许数据集发现过程 运行 正常进行。这将生成您需要的所有片段(以及一些您不需要的片段),并且已经附加了保证。然后,您可以 trim 将片段列表向下移动到您想要的片段列表,并从中创建一个数据集。

(I'm guessing I need) to add this as a field to my schema

是的。在上述两种方法中,您都需要确保将分区列添加到架构中。

下面是展示这两种方法的代码示例:

import shutil

import pyarrow as pa
import pyarrow.dataset as ds
import pyarrow.fs as fs

shutil.rmtree('my_dataset', ignore_errors=True)

table = pa.Table.from_pydict({
    'x': [1, 2, 3, 4, 5, 6],
    'part': ['a', 'a', 'a', 'b', 'b', 'b']
    })

ds.write_dataset(table, 'my_dataset', partitioning=['part'], format='parquet')

print('# Created by dataset factory')
partitioning = ds.partitioning(schema=pa.schema([pa.field('part', pa.string())]))
dataset = ds.dataset('my_dataset',partitioning=partitioning)
print(dataset.to_table())
print()

desired_paths = [
    'my_dataset/a/part-0.parquet'
]

# Note that table.schema used below includes the partitioning
# column so we've added that to the schema.
print('# Created from paths')
filesystem = fs.LocalFileSystem()
dataset_from_paths = ds.FileSystemDataset.from_paths(
    desired_paths,
    table.schema,
    format=ds.ParquetFileFormat(),
    filesystem=filesystem)
print(dataset_from_paths.to_table())
print()

print('# Created from paths with explicit partition information')
dataset_from_paths = ds.FileSystemDataset.from_paths(
    desired_paths,
    table.schema,
    partitions=[
        ds.field('part') == "a"
    ],
    format=ds.ParquetFileFormat(),
    filesystem=filesystem)
print(dataset_from_paths.to_table())
print()

print('# Created from discovery then trimmed')
trimmed_fragments = [frag for frag in dataset.get_fragments() if frag.path in desired_paths]
trimmed_dataset = ds.FileSystemDataset(trimmed_fragments, dataset.schema, dataset.format, filesystem=dataset.filesystem)
print(trimmed_dataset.to_table())