收到 PubSub 通知后触发数据流作业

Trigger Dataflow job upon receiving of a PubSub notification

我用 Apache Beam 写了一个数据流管道,让你对代码有一个基本的了解:

Job= (
    p
    |"cretae">>beam.Create(["message"])
    |"job 1" >> beam.ParDo(dofn1())
    |"job 2" >> beam.ParDo(dofn2())
    |"job 3" >> beam.ParDo(dofn3())
    )

目前,我通过创建随机消息来触发数据流管道,消息的内容并不重要,因为它仅用于触发管道。只是想知道是否有一种方法可以在收到 PubSub 通知时触发此管道。也许通过使用 Apache Beam Pubsub API?有人可以举个例子吗?谢谢

你是对的。您可以将数据流管道设置为从 GCP pub/sub 主题中读取。您可以直接从主题中阅读,但我建议创建一个订阅并将数据流管道与订阅连接(为什么?如果您想要重新启动管道并且不会错过任何到达主题的消息,它会防止您丢失消息在停止它和重新启动它之间)。

假设您已经设置了 GPC pub/sub 主题和订阅,请按以下步骤操作。您需要记住订阅路径。

import apache_beam as beam
import logging

logging.basicConfig(
    format='%(asctime)s %(levelname)-8s %(message)s',
    level=logging.INFO,
    datefmt='%Y-%m-%d %H:%M:%S')

with beam.Pipeline(options=pipeline_options) as pipeline:
    (pipeline
    | "Read PubSub Messages" >> beam.io.ReadFromPubSub(subscription=input_topic_subscription_path)
    | "Window into fixed intervals" >> beam.WindowInto(beam.FixedWindows(5))
    | "Log the messages" >> beam.Map(lambda message: logging.info(message))
    )

上面的代码将每 5 秒从 pub/sub 主题中读取消息,然后记录消息。