如何在 Apache Beam 中动态跟踪状态?

How do I keep track of state dynamically in Apache Beam?

我正在 Apache Beam 中编写数据验证脚本。每当将新文件上传到 Google Cloud Storage 时,此脚本都会收到来自 PubSub 的消息,下载文件,并对文件运行一系列预定义测试。 在这些测试结束时,我需要通过电子邮件发送所有未通过测试的行的日志。

为了不多次发送电子邮件,我做了一些阅读,并相信我可以使用 Beam 中的状态和计时器构造发送一次电子邮件。但是,每个文件都会有不同数量的错误,所以我如何设置它,以便文件发送在发送电子邮件之前需要 X 个元素,其中每个元素都是一个错误,而不是硬编码的数字。

我尝试使用带有 COUNT_STATE 的 DoFn 来计算传递给它的元素,但我得到了一个不同的错误,关于该元素是 Pcollection 而不是 K、V 元组。

管道代码如下:

with beam.Pipeline(options=pipeline_options) as p:
    # Read Lines from data
    validation = (p
                | "Read Element From PubSub" >> beam.io.ReadFromPubSub (topic=known_args.input_topic)
                | 'Filter Messages' >> beam.ParDo(FilterMessageDoFn(known_args.project, t_options.dataset_id))
                | 'After filter' >> beam.ParDo(DebugFn("DATA VALIDATION: PROCESSING FILE...", show_trace))
                | 'Generate Schemas' >> beam.ParDo(GetSchemaFn(known_args.project, t_options.validation_home_path))
                | 'After GetSschema' >> beam.ParDo(DebugFn("DATA VALIDATION: After OBTAINING SCHEMA...", show_trace))
                | 'Validate' >> beam.ParDo(ValidateFn(known_args.project)).with_outputs(
                ValidateFn.TAG_VALIDATION_GLOBAL_FAILURE,
                ValidateFn.TAG_VALIDATION_CONTENT_FAILURE,
                ValidateFn.TAG_VALIDATION_CONTENT_SUCCESS,
                main='lines')

to_be_joined = ([validation[ValidateFn.TAG_VALIDATION_GLOBAL_FAILURE], 
                validation[ValidateFn.TAG_VALIDATION_CONTENT_FAILURE]]
               | "Group By Key" >> beam.Flatten()
               | 'Persist Global Errors to Big Query' >> beam.ParDo(PersistErrorsFn(known_args.project))
               | 'Debug Errors' >> beam.ParDo(DebugFn("DATA VALIDATION: VALIDATION ERRORS", show_trace))
               | 'Save Global Errors' >> beam.io.WriteToBigQuery('data_management.validation_errors',                                                                 
              project=known_args.project,
              schema=TABLE_SCHEMA, 
              create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
              write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
)

基本上我想在写入 BigQuery 之前插入一个步骤以发送一封电子邮件,该电子邮件仅在收到 VALIDATION_GLOBAL_FAILURE + VALIDATION_CONTENT_FAILURE 个错误时发送。

谢谢!

您的想法是,您希望对包含验证失败的两个 PCollections 执行 CoGroupByKey,然后应用 DoFn,将您的电子邮件发送逻辑应用于结果。

不清楚管道中的类型是什么,但我假设 ValidateFn 输出一个 (file name, validation error) 元组到 ValidateFn.TAG_VALIDATION_GLOBAL_FAILUREValidateFn.TAG_VALIDATION_CONTENT_FAILURE

class SendEmail(beam.DoFn):
  def process(self, element):
    file_name = element[0]
    iterable_of_global_failures = element[1].get(ValidateFn.TAG_VALIDATION_GLOBAL_FAILURE)
    iterable_of_content_failures = element[1].get(ValidateFn.TAG_VALIDATION_CONTENT_FAILURE)
    ... format and send e-mail if iterables satisfy requirements ...


# create a dict containing the tag to PCollection mapping for what we want to group together.
validation = (p
              | "Read Element From PubSub" >> beam.io.ReadFromPubSub (topic=known_args.input_topic)
              | 'WindowInto' >> beam.WindowInto(FixedWindows(1))
              | ...

validation_errors = {key: validation[key] for key in [ValidateFn.TAG_VALIDATION_GLOBAL_FAILURE, ValidateFn.TAG_VALIDATION_CONTENT_FAILURE]}

(validation_errors
 | 'CoGroupByKey' >> beam.CoGroupByKey()
 | 'Send Email' >> beam.ParDo(SendEmail())

由于来自 PubsubIO 的每个输入记录都代表文件名,并且稍后会扩展为所有相关记录,因此这些记录将共享文件所属的 PubsubIO 消息的相同时间戳。这允许我们在分组时使用非常小的 window 大小,从而导致更小的组和更好的性能。指定 WindowInto 是必要的,这样我们就不会使用 GlobalWindow,因为 CoGroupByKey 永远不会触发输出发生。您可以详细了解流式传输、windowing 和触发[1, 2]