仅在 window 收盘时触发 Beam ParDo

Trigger Beam ParDo at window closing only

我有一个从 Kafka 读取事件的管道。我只想在 window 关闭时计算并记录事件计数。通过这样做,我将在每个 window 上每个 Kafka partition/shard 只有一个输出日志。我在 header 中使用时间戳,我将其截断为小时以创建 collection 小时时间戳。我按小时对时间戳进行分组,然后记录每小时的时间戳并进行计数。此日志将发送到 Grafana 以创建包含计数的仪表板。

下面是我如何从 Kafka 获取数据以及它定义 window 持续时间的位置:

int windowDuration = 5;
p.apply("Read from Kafka",KafkaIO.<byte[], GenericRecord>read()
            .withBootstrapServers(options.getSourceBrokers().get())
            .withTopics(srcTopics)
            .withKeyDeserializer(ByteArrayDeserializer.class)
            .withValueDeserializer(ConfluentSchemaRegistryDeserializerProvider
            .of(options.getSchemaRegistryUrl().get(), options.getSubject().get()))
                    .commitOffsetsInFinalize())
  .apply("Windowing of " + windowDuration +" seconds" , 
            Window.<KafkaRecord<byte[], GenericRecord>>into(
            FixedWindows.of(Duration.standardSeconds(windowDuration))));

管道中的下一步是从上面的 collection 中生成两个 collection,一个具有事件作为 GenericRecord,另一个具有每小时时间戳,见下文。我想要一个触发器(我相信)只应用两个 collection 保持计数。这样它每 window 只打印一次计数。目前,它每次从 Kafka 读取创建大量条目时都会打印一个计数。

  tuplePCollection.get(createdOnTupleTag)
  .apply(Count.perElement())
  .apply( MapElements.into(TypeDescriptors.strings())
  .via( (KV<Long,Long> recordCount) -> recordCount.getKey() +
    ": " + recordCount.getValue()))
  .apply( ParDo.of(new LoggerFn.logRecords<String>()));

这是我用来记录计数的 DoFn:

 class LoggerFn<T> extends DoFn<T, T> {
        @ProcessElement
        public void process(ProcessContext c) {
            T e = (T)c.element();
            LOGGER.info(e);
            c.output(e);
        }
    }

您可以使用触发器“Window.ClosingBehavior”。您需要指定在 window 永久关闭时将创建最终窗格的条件。您可以使用这些选项:

  • FIRE_ALWAYS:始终触发最后一个窗格。

  • FIRE_IF_NON_EMPTY:如果有新数据,则仅触发最后一个窗格 之前的射击。

你可以看到这个例子。

// We first specify to never emit any panes
 .triggering(Never.ever())
 
 // We then specify to fire always when closing the window. This will emit a
 // single final pane at the end of allowedLateness
 .withAllowedLateness(allowedLateness, Window.ClosingBehavior.FIRE_ALWAYS)
 .discardingFiredPanes())

您可以查看有关此内容的更多信息trigger