在 beam.DoFn 中调用 beam.io.WriteToBigQuery

Calling beam.io.WriteToBigQuery in a beam.DoFn

我创建了一个带有一些参数的数据流模板。当我将数据写入 BigQuery 时,我想利用这些参数来确定应该写入哪个 table。我已尝试按照以下 link 中的建议在 ParDo 中调用 WriteToBigQuery。

管道 运行 成功,但未创建数据或将数据加载到 BigQuery。知道可能是什么问题吗?

def run():
  pipeline_options = PipelineOptions()
  pipeline_options.view_as(DebugOptions).experiments = ['use_beam_bq_sink']

  with beam.Pipeline(options=pipeline_options) as p:
    custom_options = pipeline_options.view_as(CustomOptions)

    _ = (
      p
      | beam.Create([None])
      | 'Year to periods' >> beam.ParDo(SplitYearToPeriod(custom_options.year))
      | 'Read plan data' >> beam.ParDo(GetPlanDataByPeriod(custom_options.secret_name))
      | 'Transform record' >> beam.Map(transform_record)
      | 'Write to BQ' >> beam.ParDo(WritePlanDataToBigQuery(custom_options.year))
    )

if __name__ == '__main__':
  run()
class CustomOptions(PipelineOptions):
  @classmethod
  def _add_argparse_args(cls, parser):
    parser.add_value_provider_argument('--year', type=int)
    parser.add_value_provider_argument('--secret_name', type=str)
class WritePlanDataToBigQuery(beam.DoFn):
  def __init__(self, year_vp):
    self._year_vp = year_vp

  def process(self, element):
    year = self._year_vp.get()

    table = f's4c.plan_data_{year}'
    schema = {
      'fields': [ ...some fields properties ]
    }

    beam.io.WriteToBigQuery(
      table=table,
      schema=schema,
      create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
      write_disposition=BigQueryDisposition.WRITE_TRUNCATE,
      method=beam.io.WriteToBigQuery.Method.FILE_LOADS
    )

您已经在 DoFnprocess 方法中实例化了 PTransform beam.io.gcp.bigquery.WriteToBigQuery。这里有几个问题:

  • 为输入 PCollection 的每个元素调用 process 方法。它不用于构建管道图。这种动态构建图形的方法将行不通。
  • 将其移出 DoFn 后,您需要将 PTransform beam.io.gcp.bigquery.WriteToBigQuery 应用于 PCollection 才能生效。见 Beam pydoc or the Beam tutorial documentation.

要为您的 table 名称创建派生值提供程序,您需要一个 "nested" 值提供程序。不幸的是,这是 not supported for the Python SDK。不过,您可以直接使用值提供程序选项。

作为高级选项,您可能有兴趣尝试 "flex templates",它基本上将整个程序打包为 docker 图像并使用参数执行它。

如果 objective 是为了代码接受参数而不是 table 路径的 hard-coded 字符串,这里有一种方法可以实现:

  • 将 table 参数添加为 CustomOptions
  • 在您的 运行 函数中添加 CustomOptions 参数作为 默认字符串
...

class CustomOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_value_provider_argument(
            '--gcs_input_file_path',
            type=str,
            help='GCS Input File Path'
        )
        parser.add_value_provider_argument(
            '--project_id',
            type=str,
            help='GCP ProjectID'
        )
        parser.add_value_provider_argument(
            '--dataset',
            type=str,
            help='BigQuery DataSet Name'
        )
        parser.add_value_provider_argument(
            '--table',
            type=str,
            help='BigQuery Table Name'
        )

def run(argv=None):

    pipeline_option = PipelineOptions()
    pipeline = beam.Pipeline(options=pipeline_option)
    custom_options = pipeline_option.view_as(CustomOptions)
    pipeline_option.view_as(SetupOptions).save_main_session = True
    pipeline_option.view_as(DebugOptions).experiments = ['use_beam_bq_sink']

    parser = argparse.ArgumentParser()
    parser.add_argument(
        '--gcp_project_id',
        type=str,
        help='GCP ProjectID',
        default=str(custom_options.project_id)
    )
    parser.add_argument(
        '--dataset',
        type=str,
        help='BigQuery DataSet Name',
        default=str(custom_options.dataset)
    )
    parser.add_argument(
        '--table',
        type=str,
        help='BigQuery Table Name',
        default=str(custom_options.table)
    )

    static_options, _ = parser.parse_known_args(argv)
    path = static_options.gcp_project_id + ":" + static_options.dataset + "." + static_options.table

    data = (
            pipeline
            | "Read from GCS Bucket" >>
            beam.io.textio.ReadFromText(custom_options.gcs_input_file_path)
            | "Parse Text File" >>
            beam.ParDo(Split())
            | 'WriteToBigQuery' >>
            beam.io.WriteToBigQuery(
                path,
                schema=Schema,
                create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
            )
    )

    result = pipeline.run()
    result.wait_until_finish()


if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    run()
  • 在 shell 文件中的管道构建时传递 table 路径
python template.py \
  --dataset dataset_name \
  --table table_name \
  --project project_name \
  --runner DataFlowRunner \
  --region region_name \
  --staging_location gs://bucket_name/staging \
  --temp_location gs://bucket_name/temp \
  --template_location gs://bucket_name/templates/template_name