Dataflow 模板是否支持 BigQuery 接收器选项的模板输入?
Does Dataflow templating supports template input for BigQuery sink options?
因为我有一个工作的静态数据流 运行,我想从这个模板创建一个模板,让我无需输入任何命令行即可轻松重用数据流。
按照官方的Creating Templates教程没有提供templatable输出的示例。
我的数据流以一个 BigQuery 接收器结束,它接受一些参数,例如用于存储的目标 table。这个确切的参数是我想在我的模板中提供的参数,允许我在 运行 流程之后选择目标存储。
但是,我无法正常工作。下面我粘贴了一些代码片段,它们可以帮助解释我遇到的确切问题。
class CustomOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument(
'--input',
default='gs://my-source-bucket/file.json')
parser.add_value_provider_argument(
'--table',
default='my-project-id:some-dataset.some-table')
pipeline_options = PipelineOptions()
pipe = beam.Pipeline(options=pipeline_options)
custom_options = pipeline_options.view_as(CustomOptions)
(...)
# store
processed_pipe | beam.io.Write(BigQuerySink(
table=custom_options.table.get(),
schema='a_column:STRING,b_column:STRING,etc_column:STRING',
create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=BigQueryDisposition.WRITE_APPEND
))
创建模板时,我没有给它任何参数。一瞬间我收到以下错误消息:
apache_beam.error.RuntimeValueProviderError: RuntimeValueProvider(option: table, type: str, default_value: 'my-project-id:some-dataset.some-table').get() not called from a runtime context
当我在创建模板时添加 --table
参数时,正在创建模板,但是 --table
参数值然后在模板中被硬编码并且不会被 [= 的任何给定模板值覆盖15=] 之后。
当我将 table=custom_options.table.get(),
替换为 table=StaticValueProvider(str, custom_options.table.get())
时出现相同的错误。
是否有人已经构建了具有可自定义 BigQuerySink 参数的模板table 数据流?我很想得到一些提示。
Python 目前仅支持 FileBasedSource IOs 的 ValueProvider 选项。您可以通过单击您提到的 link 处的 Python 选项卡来查看:
https://cloud.google.com/dataflow/docs/templates/creating-templates
在 "Pipeline I/O and runtime parameters" 部分下。
与 Java 中发生的情况不同,Python 中的 BigQuery 不使用自定义源。换句话说,它并没有完全在SDK中实现,但也包含了后端的部分(因此它是一个"native source")。只有自定义源可以使用模板。计划将 BigQuery 添加为自定义来源:issues.apache.org/jira/browse/BEAM-1440
因为我有一个工作的静态数据流 运行,我想从这个模板创建一个模板,让我无需输入任何命令行即可轻松重用数据流。
按照官方的Creating Templates教程没有提供templatable输出的示例。
我的数据流以一个 BigQuery 接收器结束,它接受一些参数,例如用于存储的目标 table。这个确切的参数是我想在我的模板中提供的参数,允许我在 运行 流程之后选择目标存储。
但是,我无法正常工作。下面我粘贴了一些代码片段,它们可以帮助解释我遇到的确切问题。
class CustomOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument(
'--input',
default='gs://my-source-bucket/file.json')
parser.add_value_provider_argument(
'--table',
default='my-project-id:some-dataset.some-table')
pipeline_options = PipelineOptions()
pipe = beam.Pipeline(options=pipeline_options)
custom_options = pipeline_options.view_as(CustomOptions)
(...)
# store
processed_pipe | beam.io.Write(BigQuerySink(
table=custom_options.table.get(),
schema='a_column:STRING,b_column:STRING,etc_column:STRING',
create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=BigQueryDisposition.WRITE_APPEND
))
创建模板时,我没有给它任何参数。一瞬间我收到以下错误消息:
apache_beam.error.RuntimeValueProviderError: RuntimeValueProvider(option: table, type: str, default_value: 'my-project-id:some-dataset.some-table').get() not called from a runtime context
当我在创建模板时添加 --table
参数时,正在创建模板,但是 --table
参数值然后在模板中被硬编码并且不会被 [= 的任何给定模板值覆盖15=] 之后。
当我将 table=custom_options.table.get(),
替换为 table=StaticValueProvider(str, custom_options.table.get())
时出现相同的错误。
是否有人已经构建了具有可自定义 BigQuerySink 参数的模板table 数据流?我很想得到一些提示。
Python 目前仅支持 FileBasedSource IOs 的 ValueProvider 选项。您可以通过单击您提到的 link 处的 Python 选项卡来查看: https://cloud.google.com/dataflow/docs/templates/creating-templates
在 "Pipeline I/O and runtime parameters" 部分下。
与 Java 中发生的情况不同,Python 中的 BigQuery 不使用自定义源。换句话说,它并没有完全在SDK中实现,但也包含了后端的部分(因此它是一个"native source")。只有自定义源可以使用模板。计划将 BigQuery 添加为自定义来源:issues.apache.org/jira/browse/BEAM-1440