如何计算 Apache Beam 中 PCollection 的元素数量

How to calculate the number of elements of a PCollection in Apache beam

number_items = lines | 'window' >> beam.WindowInto(window.GlobalWindows()) \
    | 'CountGlobally' >> beam.combiners.Count.Globally() \
    | 'print' >> beam.ParDo(PrintFn())

我试图通过打印和日志显示它,但我什么也没找到

class PrintFn(beam.DoFn):
    def process(self, element):
        print(element)
        logging.error(element)
        return [element]

我觉得想要计算一个无界集合的元素很奇怪。我的第一感觉是永远不要追求全局 window,因为 Beam 在无界集合上等待结束......除非你执行触发器。

深入研究文档,I found this

Set a non-default trigger. This allows the global window to emit results under other conditions, since the default windowing behavior (waiting for all data to arrive) will never occur

我是对的,有了触发器,结局永远不会发生,它是无界的,无限的。

您是否尝试过跳过 window 并直接全局计数?

对于批处理,你可以简单地做

def print_row(element):
  print element

count_pcol = (
              lines
              | 'Count elements' >> beam.combiners.Count.Globally()
              | 'Print result' >> beam.Map(print_row)
            )

beam.combiners.Count.Globally() 是一个 PTransform,它使用全局组合来计算 PCollection 的所有元素并生成单个值。


对于 Streaming,计算元素是不可能的,因为源是一个无限的 pcollection,即它永远不会结束。 CombineGlobally 在你的情况下将继续等待输入并且永远不会产生输出。

一个可能的解决方案是设置一个 window 函数和一个非默认触发器。

我编写了一个简单的管道,它以 20 秒的固定 windows 划分元素,并对每个 window 的每个键进行计数。您可以根据需要更改 window 和触发。

def form_pair(data):
  return 1, data

def print_row(element):
      print element

count_pcol = (
                p 
                | 'Read from pub sub' >> beam.io.ReadFromPubSub(subscription=input_subscription)
                | 'Form key value pair' >> beam.Map(form_pair)
                | 'Apply windowing and triggers' >> 
                                       beam.WindowInto(window.FixedWindows(20),
                                       trigger=AfterProcessingTime(5), 
                                       accumulation_mode=AccumulationMode.DISCARDING)
                | 'Count elements by key' >> beam.combiners.Count.PerKey()
                | 'Print result' >> beam.Map(print_row)
               )