仅在 window 收盘时触发 Beam ParDo
Trigger Beam ParDo at window closing only
我有一个从 Kafka 读取事件的管道。我只想在 window 关闭时计算并记录事件计数。通过这样做,我将在每个 window 上每个 Kafka partition/shard 只有一个输出日志。我在 header 中使用时间戳,我将其截断为小时以创建 collection 小时时间戳。我按小时对时间戳进行分组,然后记录每小时的时间戳并进行计数。此日志将发送到 Grafana 以创建包含计数的仪表板。
下面是我如何从 Kafka 获取数据以及它定义 window 持续时间的位置:
int windowDuration = 5;
p.apply("Read from Kafka",KafkaIO.<byte[], GenericRecord>read()
.withBootstrapServers(options.getSourceBrokers().get())
.withTopics(srcTopics)
.withKeyDeserializer(ByteArrayDeserializer.class)
.withValueDeserializer(ConfluentSchemaRegistryDeserializerProvider
.of(options.getSchemaRegistryUrl().get(), options.getSubject().get()))
.commitOffsetsInFinalize())
.apply("Windowing of " + windowDuration +" seconds" ,
Window.<KafkaRecord<byte[], GenericRecord>>into(
FixedWindows.of(Duration.standardSeconds(windowDuration))));
管道中的下一步是从上面的 collection 中生成两个 collection,一个具有事件作为 GenericRecord,另一个具有每小时时间戳,见下文。我想要一个触发器(我相信)只应用两个 collection 保持计数。这样它每 window 只打印一次计数。目前,它每次从 Kafka 读取创建大量条目时都会打印一个计数。
tuplePCollection.get(createdOnTupleTag)
.apply(Count.perElement())
.apply( MapElements.into(TypeDescriptors.strings())
.via( (KV<Long,Long> recordCount) -> recordCount.getKey() +
": " + recordCount.getValue()))
.apply( ParDo.of(new LoggerFn.logRecords<String>()));
这是我用来记录计数的 DoFn:
class LoggerFn<T> extends DoFn<T, T> {
@ProcessElement
public void process(ProcessContext c) {
T e = (T)c.element();
LOGGER.info(e);
c.output(e);
}
}
您可以使用触发器“Window.ClosingBehavior”。您需要指定在 window 永久关闭时将创建最终窗格的条件。您可以使用这些选项:
FIRE_ALWAYS:始终触发最后一个窗格。
FIRE_IF_NON_EMPTY:如果有新数据,则仅触发最后一个窗格
之前的射击。
你可以看到这个例子。
// We first specify to never emit any panes
.triggering(Never.ever())
// We then specify to fire always when closing the window. This will emit a
// single final pane at the end of allowedLateness
.withAllowedLateness(allowedLateness, Window.ClosingBehavior.FIRE_ALWAYS)
.discardingFiredPanes())
您可以查看有关此内容的更多信息trigger。
我有一个从 Kafka 读取事件的管道。我只想在 window 关闭时计算并记录事件计数。通过这样做,我将在每个 window 上每个 Kafka partition/shard 只有一个输出日志。我在 header 中使用时间戳,我将其截断为小时以创建 collection 小时时间戳。我按小时对时间戳进行分组,然后记录每小时的时间戳并进行计数。此日志将发送到 Grafana 以创建包含计数的仪表板。
下面是我如何从 Kafka 获取数据以及它定义 window 持续时间的位置:
int windowDuration = 5;
p.apply("Read from Kafka",KafkaIO.<byte[], GenericRecord>read()
.withBootstrapServers(options.getSourceBrokers().get())
.withTopics(srcTopics)
.withKeyDeserializer(ByteArrayDeserializer.class)
.withValueDeserializer(ConfluentSchemaRegistryDeserializerProvider
.of(options.getSchemaRegistryUrl().get(), options.getSubject().get()))
.commitOffsetsInFinalize())
.apply("Windowing of " + windowDuration +" seconds" ,
Window.<KafkaRecord<byte[], GenericRecord>>into(
FixedWindows.of(Duration.standardSeconds(windowDuration))));
管道中的下一步是从上面的 collection 中生成两个 collection,一个具有事件作为 GenericRecord,另一个具有每小时时间戳,见下文。我想要一个触发器(我相信)只应用两个 collection 保持计数。这样它每 window 只打印一次计数。目前,它每次从 Kafka 读取创建大量条目时都会打印一个计数。
tuplePCollection.get(createdOnTupleTag)
.apply(Count.perElement())
.apply( MapElements.into(TypeDescriptors.strings())
.via( (KV<Long,Long> recordCount) -> recordCount.getKey() +
": " + recordCount.getValue()))
.apply( ParDo.of(new LoggerFn.logRecords<String>()));
这是我用来记录计数的 DoFn:
class LoggerFn<T> extends DoFn<T, T> {
@ProcessElement
public void process(ProcessContext c) {
T e = (T)c.element();
LOGGER.info(e);
c.output(e);
}
}
您可以使用触发器“Window.ClosingBehavior”。您需要指定在 window 永久关闭时将创建最终窗格的条件。您可以使用这些选项:
FIRE_ALWAYS:始终触发最后一个窗格。
FIRE_IF_NON_EMPTY:如果有新数据,则仅触发最后一个窗格 之前的射击。
你可以看到这个例子。
// We first specify to never emit any panes
.triggering(Never.ever())
// We then specify to fire always when closing the window. This will emit a
// single final pane at the end of allowedLateness
.withAllowedLateness(allowedLateness, Window.ClosingBehavior.FIRE_ALWAYS)
.discardingFiredPanes())
您可以查看有关此内容的更多信息trigger。