如何在 DataflowStartFlexTemplateOperator 中给出输出文件名
How to give output filename in DataflowStartFlexTemplateOperator
我在气流中使用 DataflowStartFlexTemplateOperator dag 运算符将我的 bigquery 外部 table 数据导出为具有所需输出文件数量的镶木地板文件格式。
但是这里如何给输出文件名。至少是文件名前缀。
下面是我的代码
export_to_gcs = DataflowStartFlexTemplateOperator(
task_id=f'export_to_gcs_day_{day_num}',
project_id=PROJECT_ID,
body={
'launchParameter': {
'containerSpecGcsPath': 'gs://dataflow-templates-us-central1/latest/flex/BigQuery_to_Parquet',
'jobName': f'tivo-export-to-gcs-{run_date}',
'environment': {
'stagingLocation': f'gs://{GCS_BUCKET_NAME}/{STAGING_LOCATION}',
'numWorkers': '1',
'maxWorkers': '20',
'workerRegion': 'us-central1',
'serviceAccountEmail': DF_SA_NAME,
'machineType': 'n1-standard-4',
'ipConfiguration': 'WORKER_IP_PRIVATE',
'tempLocation': f'gs://{GCS_BUCKET_NAME}/{TEMP_LOCATION}',
'subnetwork': SUBNETWORK,
'enableStreamingEngine': False
},
'parameters': {
'tableRef': f'{PROJECT_ID}:{DATASET_NAME}.native_firehose_table',
'bucket': f'gs://{GCS_BUCKET_NAME}/Tivo/site_activity/{run_date}/',
'numShards': '25',
},
}
},
location=REGION_NAME,
wait_until_finished=True,
dag=dag
)
"BigQuery_to_Parquet" 模板不允许您添加前缀。为此,您实际上需要 download the template 并创建自定义版本。
这是 source code 中写入文件的部分:
* Step 2: Write records to Google Cloud Storage as one or more Parquet files
* via {@link ParquetIO}.
*/
.apply(
"WriteToParquet",
FileIO.<GenericRecord>write()
.via(ParquetIO.sink(schema))
.to(options.getBucket())
.withNumShards(options.getNumShards())
.withSuffix(FILE_SUFFIX));
也许您可以从其他实际创建的支持前缀的模板中获得一些灵感,例如 streaming data generator,但请记住,它的用途不同,而且是流式处理而不是批处理。
StreamingDataGeneratorWriteToGcs.java从参数中取前缀参数:
/** Converts the fake messages in bytes to json format and writes to a text file. */
private void writeAsParquet(PCollection<GenericRecord> genericRecords, Schema avroSchema) {
genericRecords.apply(
"Write Parquet output",
FileIO.<GenericRecord>write()
.via(ParquetIO.sink(avroSchema))
.to(getPipelineOptions().getOutputDirectory())
.withPrefix(getPipelineOptions().getOutputFilenamePrefix())
.withSuffix(getPipelineOptions().getOutputType().getFileExtension())
.withNumShards(getPipelineOptions().getNumShards()));
}
另一种选择是将文件导出到一个非常特定的文件夹中,在该文件夹中它们不能与任何其他进程混合,并在后续任务中应用批量重命名。
我在气流中使用 DataflowStartFlexTemplateOperator dag 运算符将我的 bigquery 外部 table 数据导出为具有所需输出文件数量的镶木地板文件格式。 但是这里如何给输出文件名。至少是文件名前缀。 下面是我的代码
export_to_gcs = DataflowStartFlexTemplateOperator(
task_id=f'export_to_gcs_day_{day_num}',
project_id=PROJECT_ID,
body={
'launchParameter': {
'containerSpecGcsPath': 'gs://dataflow-templates-us-central1/latest/flex/BigQuery_to_Parquet',
'jobName': f'tivo-export-to-gcs-{run_date}',
'environment': {
'stagingLocation': f'gs://{GCS_BUCKET_NAME}/{STAGING_LOCATION}',
'numWorkers': '1',
'maxWorkers': '20',
'workerRegion': 'us-central1',
'serviceAccountEmail': DF_SA_NAME,
'machineType': 'n1-standard-4',
'ipConfiguration': 'WORKER_IP_PRIVATE',
'tempLocation': f'gs://{GCS_BUCKET_NAME}/{TEMP_LOCATION}',
'subnetwork': SUBNETWORK,
'enableStreamingEngine': False
},
'parameters': {
'tableRef': f'{PROJECT_ID}:{DATASET_NAME}.native_firehose_table',
'bucket': f'gs://{GCS_BUCKET_NAME}/Tivo/site_activity/{run_date}/',
'numShards': '25',
},
}
},
location=REGION_NAME,
wait_until_finished=True,
dag=dag
)
"BigQuery_to_Parquet" 模板不允许您添加前缀。为此,您实际上需要 download the template 并创建自定义版本。
这是 source code 中写入文件的部分:
* Step 2: Write records to Google Cloud Storage as one or more Parquet files
* via {@link ParquetIO}.
*/
.apply(
"WriteToParquet",
FileIO.<GenericRecord>write()
.via(ParquetIO.sink(schema))
.to(options.getBucket())
.withNumShards(options.getNumShards())
.withSuffix(FILE_SUFFIX));
也许您可以从其他实际创建的支持前缀的模板中获得一些灵感,例如 streaming data generator,但请记住,它的用途不同,而且是流式处理而不是批处理。
StreamingDataGeneratorWriteToGcs.java从参数中取前缀参数:
/** Converts the fake messages in bytes to json format and writes to a text file. */
private void writeAsParquet(PCollection<GenericRecord> genericRecords, Schema avroSchema) {
genericRecords.apply(
"Write Parquet output",
FileIO.<GenericRecord>write()
.via(ParquetIO.sink(avroSchema))
.to(getPipelineOptions().getOutputDirectory())
.withPrefix(getPipelineOptions().getOutputFilenamePrefix())
.withSuffix(getPipelineOptions().getOutputType().getFileExtension())
.withNumShards(getPipelineOptions().getNumShards()));
}
另一种选择是将文件导出到一个非常特定的文件夹中,在该文件夹中它们不能与任何其他进程混合,并在后续任务中应用批量重命名。