如何在 Apache Beam 管道中记录传入消息
How to log incoming messages in apache beam pipeline
我正在编写一个简单的 apache 束流管道,从 pubsub 主题获取输入并将其存储到 bigquery 中。几个小时以来,我认为我什至无法阅读消息,因为我只是想将输入记录到控制台:
events = p | 'Read PubSub' >> ReadFromPubSub(subscription=SUBSCRIPTION)
logging.info(events)
当我将其写入文本时,它工作正常!然而,我对 logger
的调用从未发生过。
人们如何开发/调试这些流媒体管道?
我尝试添加以下行:
events | 'Log' >> logging.info(events)
使用 print()
也不会在控制台中产生任何结果。
这是因为 events
是 PCollection
,所以您需要对其应用 PTransform
。
最简单的方法是将 ParDo
应用于 events
:
events | 'Log results' >> beam.ParDo(LogResults())
定义为:
class LogResults(beam.DoFn):
"""Just log the results"""
def process(self, element):
logging.info("Pub/Sub event: %s", element)
yield element
请注意,如果您想在下游应用进一步的步骤(例如在记录元素后写入接收器),我还会生成该元素。例如,参见问题。
我正在编写一个简单的 apache 束流管道,从 pubsub 主题获取输入并将其存储到 bigquery 中。几个小时以来,我认为我什至无法阅读消息,因为我只是想将输入记录到控制台:
events = p | 'Read PubSub' >> ReadFromPubSub(subscription=SUBSCRIPTION)
logging.info(events)
当我将其写入文本时,它工作正常!然而,我对 logger
的调用从未发生过。
人们如何开发/调试这些流媒体管道?
我尝试添加以下行:
events | 'Log' >> logging.info(events)
使用 print()
也不会在控制台中产生任何结果。
这是因为 events
是 PCollection
,所以您需要对其应用 PTransform
。
最简单的方法是将 ParDo
应用于 events
:
events | 'Log results' >> beam.ParDo(LogResults())
定义为:
class LogResults(beam.DoFn):
"""Just log the results"""
def process(self, element):
logging.info("Pub/Sub event: %s", element)
yield element
请注意,如果您想在下游应用进一步的步骤(例如在记录元素后写入接收器),我还会生成该元素。例如,参见问题