Dataflow 模板是否支持 BigQuery 接收器选项的模板输入?

Does Dataflow templating supports template input for BigQuery sink options?

因为我有一个工作的静态数据流 运行,我想从这个模板创建一个模板,让我无需输入任何命令行即可轻松重用数据流。

按照官方的Creating Templates教程没有提供templatable输出的示例。

我的数据流以一个 BigQuery 接收器结束,它接受一些参数,例如用于存储的目标 table。这个确切的参数是我想在我的模板中提供的参数,允许我在 运行 流程之后选择目标存储。

但是,我无法正常工作。下面我粘贴了一些代码片段,它们可以帮助解释我遇到的确切问题。

class CustomOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_value_provider_argument(
            '--input',
            default='gs://my-source-bucket/file.json')
        parser.add_value_provider_argument(
            '--table',
            default='my-project-id:some-dataset.some-table')

pipeline_options = PipelineOptions()

pipe = beam.Pipeline(options=pipeline_options)

custom_options = pipeline_options.view_as(CustomOptions)

(...)

# store
processed_pipe | beam.io.Write(BigQuerySink(
    table=custom_options.table.get(),
    schema='a_column:STRING,b_column:STRING,etc_column:STRING',
    create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
    write_disposition=BigQueryDisposition.WRITE_APPEND
))

创建模板时,我没有给它任何参数。一瞬间我收到以下错误消息:

apache_beam.error.RuntimeValueProviderError: RuntimeValueProvider(option: table, type: str, default_value: 'my-project-id:some-dataset.some-table').get() not called from a runtime context

当我在创建模板时添加 --table 参数时,正在创建模板,但是 --table 参数值然后在模板中被硬编码并且不会被 [= 的任何给定模板值覆盖15=] 之后。

当我将 table=custom_options.table.get(), 替换为 table=StaticValueProvider(str, custom_options.table.get()) 时出现相同的错误。

是否有人已经构建了具有可自定义 BigQuerySink 参数的模板table 数据流?我很想得到一些提示。

Python 目前仅支持 FileBasedSource IOs 的 ValueProvider 选项。您可以通过单击您提到的 link 处的 Python 选项卡来查看: https://cloud.google.com/dataflow/docs/templates/creating-templates

在 "Pipeline I/O and runtime parameters" 部分下。

与 Java 中发生的情况不同,Python 中的 BigQuery 不使用自定义源。换句话说,它并没有完全在SDK中实现,但也包含了后端的部分(因此它是一个"native source")。只有自定义源可以使用模板。计划将 BigQuery 添加为自定义来源:issues.apache.org/jira/browse/BEAM-1440