如何在 Apache Beam 中动态跟踪状态?
How do I keep track of state dynamically in Apache Beam?
我正在 Apache Beam 中编写数据验证脚本。每当将新文件上传到 Google Cloud Storage 时,此脚本都会收到来自 PubSub 的消息,下载文件,并对文件运行一系列预定义测试。
在这些测试结束时,我需要通过电子邮件发送所有未通过测试的行的日志。
为了不多次发送电子邮件,我做了一些阅读,并相信我可以使用 Beam 中的状态和计时器构造发送一次电子邮件。但是,每个文件都会有不同数量的错误,所以我如何设置它,以便文件发送在发送电子邮件之前需要 X 个元素,其中每个元素都是一个错误,而不是硬编码的数字。
我尝试使用带有 COUNT_STATE 的 DoFn 来计算传递给它的元素,但我得到了一个不同的错误,关于该元素是 Pcollection 而不是 K、V 元组。
管道代码如下:
with beam.Pipeline(options=pipeline_options) as p:
# Read Lines from data
validation = (p
| "Read Element From PubSub" >> beam.io.ReadFromPubSub (topic=known_args.input_topic)
| 'Filter Messages' >> beam.ParDo(FilterMessageDoFn(known_args.project, t_options.dataset_id))
| 'After filter' >> beam.ParDo(DebugFn("DATA VALIDATION: PROCESSING FILE...", show_trace))
| 'Generate Schemas' >> beam.ParDo(GetSchemaFn(known_args.project, t_options.validation_home_path))
| 'After GetSschema' >> beam.ParDo(DebugFn("DATA VALIDATION: After OBTAINING SCHEMA...", show_trace))
| 'Validate' >> beam.ParDo(ValidateFn(known_args.project)).with_outputs(
ValidateFn.TAG_VALIDATION_GLOBAL_FAILURE,
ValidateFn.TAG_VALIDATION_CONTENT_FAILURE,
ValidateFn.TAG_VALIDATION_CONTENT_SUCCESS,
main='lines')
to_be_joined = ([validation[ValidateFn.TAG_VALIDATION_GLOBAL_FAILURE],
validation[ValidateFn.TAG_VALIDATION_CONTENT_FAILURE]]
| "Group By Key" >> beam.Flatten()
| 'Persist Global Errors to Big Query' >> beam.ParDo(PersistErrorsFn(known_args.project))
| 'Debug Errors' >> beam.ParDo(DebugFn("DATA VALIDATION: VALIDATION ERRORS", show_trace))
| 'Save Global Errors' >> beam.io.WriteToBigQuery('data_management.validation_errors',
project=known_args.project,
schema=TABLE_SCHEMA,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
)
基本上我想在写入 BigQuery 之前插入一个步骤以发送一封电子邮件,该电子邮件仅在收到 VALIDATION_GLOBAL_FAILURE + VALIDATION_CONTENT_FAILURE 个错误时发送。
谢谢!
您的想法是,您希望对包含验证失败的两个 PCollections
执行 CoGroupByKey
,然后应用 DoFn
,将您的电子邮件发送逻辑应用于结果。
不清楚管道中的类型是什么,但我假设 ValidateFn
输出一个 (file name, validation error)
元组到 ValidateFn.TAG_VALIDATION_GLOBAL_FAILURE
和 ValidateFn.TAG_VALIDATION_CONTENT_FAILURE
。
class SendEmail(beam.DoFn):
def process(self, element):
file_name = element[0]
iterable_of_global_failures = element[1].get(ValidateFn.TAG_VALIDATION_GLOBAL_FAILURE)
iterable_of_content_failures = element[1].get(ValidateFn.TAG_VALIDATION_CONTENT_FAILURE)
... format and send e-mail if iterables satisfy requirements ...
# create a dict containing the tag to PCollection mapping for what we want to group together.
validation = (p
| "Read Element From PubSub" >> beam.io.ReadFromPubSub (topic=known_args.input_topic)
| 'WindowInto' >> beam.WindowInto(FixedWindows(1))
| ...
validation_errors = {key: validation[key] for key in [ValidateFn.TAG_VALIDATION_GLOBAL_FAILURE, ValidateFn.TAG_VALIDATION_CONTENT_FAILURE]}
(validation_errors
| 'CoGroupByKey' >> beam.CoGroupByKey()
| 'Send Email' >> beam.ParDo(SendEmail())
由于来自 PubsubIO 的每个输入记录都代表文件名,并且稍后会扩展为所有相关记录,因此这些记录将共享文件所属的 PubsubIO 消息的相同时间戳。这允许我们在分组时使用非常小的 window 大小,从而导致更小的组和更好的性能。指定 WindowInto
是必要的,这样我们就不会使用 GlobalWindow
,因为 CoGroupByKey
永远不会触发输出发生。您可以详细了解流式传输、windowing 和触发[1, 2]。
我正在 Apache Beam 中编写数据验证脚本。每当将新文件上传到 Google Cloud Storage 时,此脚本都会收到来自 PubSub 的消息,下载文件,并对文件运行一系列预定义测试。 在这些测试结束时,我需要通过电子邮件发送所有未通过测试的行的日志。
为了不多次发送电子邮件,我做了一些阅读,并相信我可以使用 Beam 中的状态和计时器构造发送一次电子邮件。但是,每个文件都会有不同数量的错误,所以我如何设置它,以便文件发送在发送电子邮件之前需要 X 个元素,其中每个元素都是一个错误,而不是硬编码的数字。
我尝试使用带有 COUNT_STATE 的 DoFn 来计算传递给它的元素,但我得到了一个不同的错误,关于该元素是 Pcollection 而不是 K、V 元组。
管道代码如下:
with beam.Pipeline(options=pipeline_options) as p:
# Read Lines from data
validation = (p
| "Read Element From PubSub" >> beam.io.ReadFromPubSub (topic=known_args.input_topic)
| 'Filter Messages' >> beam.ParDo(FilterMessageDoFn(known_args.project, t_options.dataset_id))
| 'After filter' >> beam.ParDo(DebugFn("DATA VALIDATION: PROCESSING FILE...", show_trace))
| 'Generate Schemas' >> beam.ParDo(GetSchemaFn(known_args.project, t_options.validation_home_path))
| 'After GetSschema' >> beam.ParDo(DebugFn("DATA VALIDATION: After OBTAINING SCHEMA...", show_trace))
| 'Validate' >> beam.ParDo(ValidateFn(known_args.project)).with_outputs(
ValidateFn.TAG_VALIDATION_GLOBAL_FAILURE,
ValidateFn.TAG_VALIDATION_CONTENT_FAILURE,
ValidateFn.TAG_VALIDATION_CONTENT_SUCCESS,
main='lines')
to_be_joined = ([validation[ValidateFn.TAG_VALIDATION_GLOBAL_FAILURE],
validation[ValidateFn.TAG_VALIDATION_CONTENT_FAILURE]]
| "Group By Key" >> beam.Flatten()
| 'Persist Global Errors to Big Query' >> beam.ParDo(PersistErrorsFn(known_args.project))
| 'Debug Errors' >> beam.ParDo(DebugFn("DATA VALIDATION: VALIDATION ERRORS", show_trace))
| 'Save Global Errors' >> beam.io.WriteToBigQuery('data_management.validation_errors',
project=known_args.project,
schema=TABLE_SCHEMA,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
)
基本上我想在写入 BigQuery 之前插入一个步骤以发送一封电子邮件,该电子邮件仅在收到 VALIDATION_GLOBAL_FAILURE + VALIDATION_CONTENT_FAILURE 个错误时发送。
谢谢!
您的想法是,您希望对包含验证失败的两个 PCollections
执行 CoGroupByKey
,然后应用 DoFn
,将您的电子邮件发送逻辑应用于结果。
不清楚管道中的类型是什么,但我假设 ValidateFn
输出一个 (file name, validation error)
元组到 ValidateFn.TAG_VALIDATION_GLOBAL_FAILURE
和 ValidateFn.TAG_VALIDATION_CONTENT_FAILURE
。
class SendEmail(beam.DoFn):
def process(self, element):
file_name = element[0]
iterable_of_global_failures = element[1].get(ValidateFn.TAG_VALIDATION_GLOBAL_FAILURE)
iterable_of_content_failures = element[1].get(ValidateFn.TAG_VALIDATION_CONTENT_FAILURE)
... format and send e-mail if iterables satisfy requirements ...
# create a dict containing the tag to PCollection mapping for what we want to group together.
validation = (p
| "Read Element From PubSub" >> beam.io.ReadFromPubSub (topic=known_args.input_topic)
| 'WindowInto' >> beam.WindowInto(FixedWindows(1))
| ...
validation_errors = {key: validation[key] for key in [ValidateFn.TAG_VALIDATION_GLOBAL_FAILURE, ValidateFn.TAG_VALIDATION_CONTENT_FAILURE]}
(validation_errors
| 'CoGroupByKey' >> beam.CoGroupByKey()
| 'Send Email' >> beam.ParDo(SendEmail())
由于来自 PubsubIO 的每个输入记录都代表文件名,并且稍后会扩展为所有相关记录,因此这些记录将共享文件所属的 PubsubIO 消息的相同时间戳。这允许我们在分组时使用非常小的 window 大小,从而导致更小的组和更好的性能。指定 WindowInto
是必要的,这样我们就不会使用 GlobalWindow
,因为 CoGroupByKey
永远不会触发输出发生。您可以详细了解流式传输、windowing 和触发[1, 2]。