如何创建以当前日期作为输入的光束模板(每日更新)[从 GET 请求创建]

How to create a beam template with current date as an input (updated daily) [Create from GET request]

我正在尝试使用 Cloud Scheduler 每天创建一个数据流作业 运行。我需要使用 GET 请求从外部 API 获取数据,因此我需要当前日期作为输入。但是,当我将数据流作业导出为计划模板时,输入的日期停留在执行时间,而不是每天更新。我一直在四处寻找解决方案,遇到了 ValueProvider,但我的管道用 apache_beam.transforms.Create 表示总是 return 错误 'RuntimeValueProvider(option: test, type: str, default_value: 'killme').get() 未从 运行时间上下文调用' 当未指定 ValueProvider 时。

有没有办法克服这个问题?这似乎是一个如此简单的问题,但无论如何我都无法解决它。如果有任何想法,我将不胜感激!!

您可以使用 ValueProvider 接口将运行时参数传递给管道,要在 DoFn 中访问它,您需要将其作为参数传递。类似于此处的以下示例:

https://beam.apache.org/documentation/patterns/pipeline-options/#retroactively-logging-runtime-parameters

class LogValueProvidersFn(beam.DoFn):
  def __init__(self, string_vp):
    self.string_vp = string_vp

  # Define the DoFn that logs the ValueProvider value.
  # The DoFn is called when creating the pipeline branch.
  # This example logs the ValueProvider value, but
  # you could store it by pushing it to an external database.
  def process(self, an_int):
    logging.info('The string_value is %s' % self.string_vp.get())
    # Another option (where you don't need to pass the value at all) is:
    logging.info(
        'The string value is %s' %
        RuntimeValueProvider.get_value('string_value', str, ''))

  | beam.Create([None])
  | 'LogValueProvs' >> beam.ParDo(
      LogValueProvidersFn(my_options.string_value)))

您可能还想看看 Flex 模板:

https://cloud.google.com/dataflow/docs/guides/templates/using-flex-templates