使用 ValueProvider 在 Dataflow 中格式化 BigQuery
use ValueProvider to format a BigQuery in Dataflow
我目前正在使用 Dataflow 在 python 中进行循环批处理。
基本上我从 bigquery 读取数据并对其进行处理。我的管道看起来像这样
pipeline_options = PipelineOptions()
p = beam.Pipeline(options=pipeline_options)
lines = (p
| 'read_big_query' >> beam.io.Read(beam.io.BigQuerySource(query=QUERY, use_standard_sql=True))
| "doing stuff" >> beam.Map(do_some_stuff)
)
我想 运行 使用数据流模板的作业使其适应 运行 时间。
感谢文档 https://cloud.google.com/dataflow/docs/guides/templates/creating-templates,在你的函数部分使用 ValueProvider,我设法从 运行time 使用 ParDo 给 "do_some_stuff" 一个额外的参数。
class TemplateOption(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument('--template_do_stuff_param',
default=45,
type=int)
class MyDoStuffFn(beam.DoFn):
def __init__(self, template_do_stuff_param):
self.template_do_stuff_param = template_do_stuff_param
def process(self, *_):
yield do_some_stuff(self.template_do_stuff_param.get())
pipeline_options = PipelineOptions()
p = beam.Pipeline(options=pipeline_options)
template_option = pipeline_options.view_as(TemplateOption)
lines = (p
| 'read_big_query' >> beam.io.Read(beam.io.BigQuerySource(query=QUERY),
use_standard_sql=True))
| "doing stuff" >> beam.ParDo(MyDoStuffFn(template_option.template_do_stuff_param))
)
但我还想更改进程关注的用户数量,因此我想使查询适应 运行时间。
class TemplateOption(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument('--template_nb_users',
default=100,
type=int)
parser.add_value_provider_argument('--template_do_stuff_param',
default=45,
type=int)
class MyDoStuffFn(beam.DoFn):
def __init__(self, template_do_stuff_param):
self.template_do_stuff_param = template_do_stuff_param
def process(self, *_):
yield do_some_stuff(self.template_do_stuff_param.get())
pipeline_options = PipelineOptions()
p = beam.Pipeline(options=pipeline_options)
template_option = pipeline_options.view_as(TemplateOption)
lines = (p
| 'read_big_query' >> beam.io.Read(beam.io.BigQuerySource(query=QUERY.format(nb_users=template_option.template_nb_users.get()),
use_standard_sql=True))
| "doing stuff" >> beam.ParDo(MyDoStuffFn(template_option.template_do_stuff_param))
)
... 这不起作用,因为我在管道执行之前调用了 get() 。到目前为止,我没有设法将我为 do_some_stuff 函数所做的调整适应 "Read" 行
任何关于如何进行的建议或解决方案将不胜感激。谢谢!
遗憾的是,BigQuerySource
不支持值提供程序。这是因为它是在数据流运行器中本地实现的,因此所有信息都需要在管道构建时可用。
您可以尝试转换 apache_beam.io.gcp.bigquery.ReadFromBigQuery
- 这将允许您使用值提供程序。
我目前正在使用 Dataflow 在 python 中进行循环批处理。
基本上我从 bigquery 读取数据并对其进行处理。我的管道看起来像这样
pipeline_options = PipelineOptions()
p = beam.Pipeline(options=pipeline_options)
lines = (p
| 'read_big_query' >> beam.io.Read(beam.io.BigQuerySource(query=QUERY, use_standard_sql=True))
| "doing stuff" >> beam.Map(do_some_stuff)
)
我想 运行 使用数据流模板的作业使其适应 运行 时间。
感谢文档 https://cloud.google.com/dataflow/docs/guides/templates/creating-templates,在你的函数部分使用 ValueProvider,我设法从 运行time 使用 ParDo 给 "do_some_stuff" 一个额外的参数。
class TemplateOption(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument('--template_do_stuff_param',
default=45,
type=int)
class MyDoStuffFn(beam.DoFn):
def __init__(self, template_do_stuff_param):
self.template_do_stuff_param = template_do_stuff_param
def process(self, *_):
yield do_some_stuff(self.template_do_stuff_param.get())
pipeline_options = PipelineOptions()
p = beam.Pipeline(options=pipeline_options)
template_option = pipeline_options.view_as(TemplateOption)
lines = (p
| 'read_big_query' >> beam.io.Read(beam.io.BigQuerySource(query=QUERY),
use_standard_sql=True))
| "doing stuff" >> beam.ParDo(MyDoStuffFn(template_option.template_do_stuff_param))
)
但我还想更改进程关注的用户数量,因此我想使查询适应 运行时间。
class TemplateOption(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument('--template_nb_users',
default=100,
type=int)
parser.add_value_provider_argument('--template_do_stuff_param',
default=45,
type=int)
class MyDoStuffFn(beam.DoFn):
def __init__(self, template_do_stuff_param):
self.template_do_stuff_param = template_do_stuff_param
def process(self, *_):
yield do_some_stuff(self.template_do_stuff_param.get())
pipeline_options = PipelineOptions()
p = beam.Pipeline(options=pipeline_options)
template_option = pipeline_options.view_as(TemplateOption)
lines = (p
| 'read_big_query' >> beam.io.Read(beam.io.BigQuerySource(query=QUERY.format(nb_users=template_option.template_nb_users.get()),
use_standard_sql=True))
| "doing stuff" >> beam.ParDo(MyDoStuffFn(template_option.template_do_stuff_param))
)
... 这不起作用,因为我在管道执行之前调用了 get() 。到目前为止,我没有设法将我为 do_some_stuff 函数所做的调整适应 "Read" 行
任何关于如何进行的建议或解决方案将不胜感激。谢谢!
遗憾的是,BigQuerySource
不支持值提供程序。这是因为它是在数据流运行器中本地实现的,因此所有信息都需要在管道构建时可用。
您可以尝试转换 apache_beam.io.gcp.bigquery.ReadFromBigQuery
- 这将允许您使用值提供程序。