使用 Apache Beam Python SDK 将文件写入 Parquet 中的动态目标

Writing files to dynamic destinations in Parquet using Apache Beam Python SDK

我正在尝试通过 WriteToFiles class.

使用 dynamic destinations 编写 Parquet 文件

我什至发现了一些进一步开发的示例,例如 ,他们在其中构建了自定义 Avro 文件接收器。

我目前正在尝试使用 pyarrow 库编写一个 Parquet 接收器,它可以以分布式方式管理写入操作,类似于 WriteToParquet PTransform 的完成方式。

class ParquetFileSink(fileio.FileSink):
    def __init__(self, schema, codec='deflate'):
        self._schema = schema
        self._codec = codec
        self.writer = None

    def open(self, fh):
        # This is called on every new bundle.
        self.writer = pq.ParquetWriter(
            fh,
            self._schema,
            compression=self._codec,
            use_deprecated_int96_timestamps=False
        )

    def write(self, record):
        # This is called on every element.
        row = pa.Table.from_pandas(
            pd.DataFrame(record), schema=self._schema, preserve_index=False
        )
        self.writer.write_table(row)

    def flush(self):
        pass

这里的主要问题是,据我所知,将无界 PCollections 写成 Parquet 文件是不可能的,所以如果我尝试使用以下 class 按记录写入,要么我得到在关闭的文件处理程序上写入错误,或者根本没有创建某些文件。 我还尝试使用 GroupByKey PTransform 编写批处理,但是由于无法关闭 pyarrow.parquet.ParquetWriter 对象,文件最终只被部分写入并被损坏。此外,这种策略并不安全,因为批次可能非常大,将它们写为单个文件并不是一个好主意。

我可以看到 class apache_beam.io.parquetio._ParquetSink 中正面临这个问题,但我认为这不能直接应用于 WriteToFiles class因为我看不到如何使用它来完全管理文件处理程序。

parquet 格式针对批量写入数据进行了优化。因此,它不适合流式传输,您可以一张接一张地接收记录。在您的示例中,您在镶木地板文件中逐行写入,这是非常低效的。

我建议以适合逐行附加数据的格式保存您的数据,然后定期将这些数据分批移动到 parquet 文件。

或者您可以像 apache_beam.io.parquetio._ParquetSink 那样做。它将记录保存在内存中的缓冲区中,并时不时地批量写入它们。但是有了这个,如果您的应用程序崩溃,您 运行 将面临丢失缓冲区中记录的风险。

我遇到了类似的问题,最后我写了一个可以与 WriteToFiles 一起使用的 ParquetSink。因此,根据您的配置,它会批量处理内存中的记录。 我已经使用它在依赖于记录中的字段的批处理过程中创建动态文件,但我认为它也适用于流媒体管道,尽管我没有测试过它。

您可以在 this gist

中找到代码
class ParquetSink(fileio.FileSink):
    def __init__(self,
                file_path_prefix,
                schema,
                row_group_buffer_size=64 * 1024 * 1024,
                record_batch_size=1000,
                codec='none',
                use_deprecated_int96_timestamps=False,
                file_name_suffix='',
                num_shards=0,
                shard_name_template=None,
                mime_type='application/x-parquet'):
        self._inner_sink = beam.io.parquetio._create_parquet_sink(
            file_path_prefix,
            schema,
            codec,
            row_group_buffer_size,
            record_batch_size,
            use_deprecated_int96_timestamps,
            file_name_suffix,
            num_shards,
            shard_name_template,
            mime_type
        )
        self._codec = codec
        self._schema = schema
        self._use_deprecated_int96_timestamps = use_deprecated_int96_timestamps

    def open(self, fh):
        self._pw = pyarrow.parquet.ParquetWriter(
            fh,
            self._schema,
            compression=self._codec,
            use_deprecated_int96_timestamps=self._use_deprecated_int96_timestamps)

    def write(self, record):
        self._inner_sink.write_record(self._pw, record)

    def flush(self):
        if len(self._inner_sink._buffer[0]) > 0:
            self._inner_sink._flush_buffer()
        if self._inner_sink._record_batches_byte_size > 0:
            self._inner_sink._write_batches(self._pw)

        self._pw.close()



   def parquet_compatible_filenaming(suffix=None):
    def _inner(window, pane, shard_index, total_shards, compression, destination):
        return fileio.destination_prefix_naming(suffix )(
            window, pane, shard_index, total_shards, compression, destination).replace(":", ".")

    return _inner


def get_parquet_pipeline(pipeline_options, input, output):
    with beam.Pipeline(options=pipeline_options) as p:
        lines = (p 
                    | 'Read' >> beam.io.ReadFromParquet(file_pattern=input)
                    | 'Transform' >> beam.Map(lambda x: { 'some_key': x['some_key'], 'raw': x})
                    | 'Write to Parquet' >> fileio.WriteToFiles(
                                                path=str(output),
                                                destination=lambda x: x["some_key"],
                                                sink=lambda x: ParquetSink(
                                                                    file_path_prefix=output,
                                                                    file_name_suffix=".parquet",
                                                                    codec="snappy",
                                                                    schema=pyarrow.schema([
                                                                        pyarrow.field("some_key", pyarrow.string()),
                                                                        pyarrow.field("raw", pyarrow.string())
                                                                    ])),
                                                    file_naming=parquet_compatible_filenaming(suffix=".parquet")
                                                )
        )