如何将重复数据删除添加到流管道 [apache-beam]

How to add de-duplication to a streaming pipeline [apache-beam]

我在 apache beam [python] 中有一个工作流管道,它从 pub/sub 摄取数据,在数据流中执行丰富并将其传递给大查询。

使用流式传输 window,我想确保消息不会重复,(因为 pub/sub 保证至少只发送一次)。

所以,我想我应该使用 beam 中的 distinct 方法,但是一旦我使用它,我的管道就会中断(无法继续进行,任何本地打印也不可见)。

这是我的管道代码:

    with beam.Pipeline(options=options) as p:
        message = (p | "ReadFromPubSub" >> beam.io.ReadFromPubSub(topic=known_args.topic).
                   with_output_types(bytes))

        bq_data = (message | "Decode" >> beam.FlatMap(lambda x: [x.decode('utf-8')])
                           | "Deduplication" >> beam.Distinct()
                           | "JSONLoad" >> beam.ParDo(ReadAsJSON())
                           | "Windowing" >> beam.WindowInto(window.FixedWindows(10, 0))
                           | "KeepRelevantData" >> beam.ParDo(KeepRelevantData())
                           | "PreProcessing" >> beam.ParDo(PreProcessing())
                           | "SendLimitedKeys" >> beam.ParDo(SendLimitedKeys(), schema=schema)
                   )

        if not known_args.local:
            bq_data | "WriteToBigQuery" >> beam.io.WriteToBigQuery(table=known_args.bq_table, schema=schema)
        else:
            bq_data | "Display" >> beam.ParDo(Display())

正如你在去重标签中看到的,我正在调用beam.Distinct方法。

问题:

  1. 去重应该在管道的什么地方进行?

  2. 这甚至是 correct/sane 方法吗?

  3. 我还能如何删除重复的流缓冲数据?

  4. 是否需要重复数据删除,还是我只是在浪费时间?

任何解决方案或建议将不胜感激。谢谢

您可能会发现 Exactly-once processing 上的这篇博客很有帮助。首先,Dataflow 已经根据 pub/sub 记录 ID 执行重复数据删除。但是,正如博客所述:"In some cases however, this is not quite enough. The user’s publishing process might retry publishes".

因此,如果向 Pub/Sub 发布消息的系统可能会多次发布同一消息,那么您可能希望添加自己的确定性记录 ID。然后 Cloud Dataflow 将检测到这些。这是我推荐的方法,而不是尝试在您自己的管道中进行重复数据删除。

您可以使用 withIdAttribute on PubSubIO.Read. Example

一些关于为什么我认为 Distinct 会导致卡住的解释。 Distinct 尝试删除 Window 中的重复数据。我相信您正在尝试对全局 window 进行重复数据删除,因此您的管道必须缓冲并比较所有元素,因为这是一个无界的 PCollection。它将尝试永远缓冲。

我相信如果您先执行 windowing 并且您有确定的事件时间戳(看起来您没有使用 withTimestampAttribute),这将正常工作。然后 Distinct 将仅应用于 window 中的元素(具有相同时间戳的相同元素将被放入相同的 window 中)。您可能想看看这是否适用于原型设计,但我建议尽可能添加唯一的记录 ID,并允许 Dataflow 根据记录 ID 处理重复以获得最佳性能。