从 PCollection 中提取常量值

Extract constant value from a PCollection

我有一个连接器,可以为我提供数百万个条目。其中一些条目有一个字段 date,其值对所有条目都相同,但其他条目则不同。我需要获取此值并将其设置到管道中的每个条目。

如果不是分布式管道,我会这样做:

def set_date(entries):
    for entry in entries:
        mydate = entry.get('date')
        if mydate:
            break

    for entry in entries:
        entry['date'] = mydate

但是,我必须单独对待每个条目而不是作为一个集合,所以我不能这样写逻辑。

此外,我们不能保证每一组 PCollection 至少有一个条目具有所需的 date 字段。这不太可能,但仍然可能发生 PCollection 上的所有条目都没有 date 字段,所以我上面写的逻辑是无效的。我需要编写某种 PTransform 将 date 值保存到 class 属性,并在某种类型的“finally”子句上设置每个条目的值。

正确的实现方式是什么?

好问题!对于批处理管道中的 Beam,您可以使用 侧输入 来完成此操作。你会做这样的事情:

def get_date(entry):
  if 'date' in entry:
    yield entry['date']

def put_date(entry, date):
  entry['date'] = date[0]
  return entry

with beam.Pipeline(...) as p:
  entries = p | ReadEntries(....)

  # First obtain the date from the entires.
  # We do this by outputting the date from the entries with a date,
  # and then we 'sample' one (this is a simple combiner that lets us
  # pick one of the dates - since they're all the same).
  date_pc = beam.pvalue.AsList(
      entries
      | beam.FlatMap(get_date)
      | beam.Combine(Sample.FixedSizeGlobally(1))

  # We then use the date_pc side input to join it into entries.
  dated_entries = entries | beam.Map(put_date, date_pc)

如果有帮助请告诉我。

如果 none 个条目具有 date 属性,那么我们可能需要做一些其他事情。很高兴迭代!

据我所知,我想在 运行 我的管道之前为我的条目设置 date 值,我已将其添加为 ValueProvier 并将其提供给我的 SetDate PTransform 作为辅助输入。如果不是你的情况,你可以试试标记为正确的答案。

这是我的最终实现:

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

class CustomOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_value_provider_argument(
            '--query',
            help='Query to retrieve from BigQuery acting as data source.')
        parser.add_value_provider_argument(
            '--mydate',
            help='Date with %Y-%m-%d format of the entries we are processing.')

options = PipelineOptions()
args = options.view_as(CustomOptions)

def run():
    with beam.Pipeline(options=options) as p:
        (
            p
            | "Read from BigQuery" >> beam.io.ReadFromBigQuery(
                                          query=args.query,
                                          use_standard_sql=True,
                                          flatten_results=False)
            | "Set date" >> beam.ParDo(SetDate(args.mydate))
        )

class SetDate(beam.DoFn):
    def __init__(self, mydate):
        self.mydate = mydate

    def process(self, entry):
        if 'date' not in entry:
            entry['date'] = self.mydate.get()
        yield entry