如何将列索引添加到 Apache Beam Python SDK 的 Parquet 输出?

how to add column index to Parquet output from Apache Beam Python SDK?

我正在尝试批处理来自 GCS 的 .avro 文件并将结果作为 Parquet 文件写回 GCS,数据是时间序列并且元素带有时间戳。如何从 Parquet 输出中的 timestamp 列创建列索引?在 Pandas/Dask 中,它是一个简单的 .set_index('timestamp') 语句..

class AddTimestampDoFn(beam.DoFn):
    def process(self, element):
        yield beam.window.TimestampedValue(element, element['timestamp'])


with beam.Pipeline(options=pipeline_options) as pipeline:
    p = pipeline | 'ReadAvro' >> beam.io.ReadFromAvro(
        'gs://input/*.avro')

    timestamped_items = p | 'timestamp' >> beam.ParDo(AddTimestampDoFn())
    fixed_windowed_items = (timestamped_items | 'window' >> 
                           beam.WindowInto(window.FixedWindows(60)))
    processed_items = fixed_windowed_items | 'compute' >> beam.ParDo(
                      ComputeDoFn())

    _ = processed_items | beam.io.WriteToParquet('gs://output/out.parquet',
                                            pyarrow.schema(
                                                [
                                                    ('timestamp',
                                                     pyarrow.timestamp('s')), ........

我们 beam.io.WriteToParquet 使用箭头的 ParquetWriter to write parquet files. I'm not seeing any way to set the index with this writer. However, you could use Beam's Dataframe support 将您的 PCollection 转换为 Dataframe,设置索引,然后调用 to_parquet(...) 委托给底层 pandas 实现和应该写出索引。