如何在 DataflowStartFlexTemplateOperator 中给出输出文件名

How to give output filename in DataflowStartFlexTemplateOperator

我在气流中使用 DataflowStartFlexTemplateOperator dag 运算符将我的 bigquery 外部 table 数据导出为具有所需输出文件数量的镶木地板文件格式。 但是这里如何给输出文件名。至少是文件名前缀。 下面是我的代码

    export_to_gcs = DataflowStartFlexTemplateOperator(
        task_id=f'export_to_gcs_day_{day_num}',
        project_id=PROJECT_ID,
        body={
            'launchParameter': {
                'containerSpecGcsPath': 'gs://dataflow-templates-us-central1/latest/flex/BigQuery_to_Parquet',
                'jobName': f'tivo-export-to-gcs-{run_date}',
                'environment': {
                    'stagingLocation': f'gs://{GCS_BUCKET_NAME}/{STAGING_LOCATION}',
                    'numWorkers': '1',
                    'maxWorkers': '20',
                    'workerRegion': 'us-central1',
                    'serviceAccountEmail': DF_SA_NAME,
                    'machineType': 'n1-standard-4',
                    'ipConfiguration': 'WORKER_IP_PRIVATE',
                    'tempLocation': f'gs://{GCS_BUCKET_NAME}/{TEMP_LOCATION}',
                    'subnetwork': SUBNETWORK,
                    'enableStreamingEngine': False
                },
                'parameters': {
                    'tableRef': f'{PROJECT_ID}:{DATASET_NAME}.native_firehose_table',
                    'bucket': f'gs://{GCS_BUCKET_NAME}/Tivo/site_activity/{run_date}/',
                    'numShards': '25',

                },
            }
        },
        location=REGION_NAME,
        wait_until_finished=True,
        dag=dag
    )

"BigQuery_to_Parquet" 模板不允许您添加前缀。为此,您实际上需要 download the template 并创建自定义版本。

这是 source code 中写入文件的部分:

         * Step 2: Write records to Google Cloud Storage as one or more Parquet files
         *         via {@link ParquetIO}.
         */
        .apply(
            "WriteToParquet",
            FileIO.<GenericRecord>write()
                .via(ParquetIO.sink(schema))
                .to(options.getBucket())
                .withNumShards(options.getNumShards())
                .withSuffix(FILE_SUFFIX));

也许您可以从其他实际创建的支持前缀的模板中获得一些灵感,例如 streaming data generator,但请记住,它的用途不同,而且是流式处理而不是批处理。

StreamingDataGeneratorWriteToGcs.java从参数中取前缀参数:

  /** Converts the fake messages in bytes to json format and writes to a text file. */
  private void writeAsParquet(PCollection<GenericRecord> genericRecords, Schema avroSchema) {
    genericRecords.apply(
        "Write Parquet output",
        FileIO.<GenericRecord>write()
            .via(ParquetIO.sink(avroSchema))
            .to(getPipelineOptions().getOutputDirectory())
            .withPrefix(getPipelineOptions().getOutputFilenamePrefix())
            .withSuffix(getPipelineOptions().getOutputType().getFileExtension())
            .withNumShards(getPipelineOptions().getNumShards()));
  }

另一种选择是将文件导出到一个非常特定的文件夹中,在该文件夹中它们不能与任何其他进程混合,并在后续任务中应用批量重命名。