使用有界 Kafka 数据源时,如何在 Flink 中测量直通输出和延迟?

How to measure through-output and latency in Flink when using a bounded Kafka data source?

我有一个 Flink 应用程序,想评估应用程序的性能。

我的计划是在实验中使用有界历史数据集作为数据源,通过测量总时间成本,我可以得到直通输出值。通过查询延迟指标,我可以获得延迟情况。

但是,当使用Kafka作为数据源时,在消耗了特定数量的数据后,应用程序似乎还在等待其他数据被消耗而没有转向完成状态。我认为在等待的过程中,through输出值和延迟可能会变低,这与应用程序的性能无关。

在这种情况下,如何获得与我的有界数据集相关的精确时间成本和延迟条件,并避免来自额外等待阶段的影响?

如果您使用 KafkaSource 而不是 FlinkKafkaConsumer,您可以使用 setBounded 指定结束偏移量,如

KafkaSource<String> source = KafkaSource
        .<String>builder()
        .setBootstrapServers(...)
        .setGroupId(...)
        .setTopics(...)
        .setDeserializer(...)
        .setStartingOffsets(OffsetsInitializer.earliest())
        .setBounded(OffsetsInitializer.latest())
        .build();

env.fromSource(source, watermarkStrategy, "name"));

这样,使用 Kafka 作为源的有界作业将干净地结束。

如果你需要用 FlinkKafkaConsumer 完成同样的事情,那么实现 DeserializationSchema(或 KafkaDeserializationSchema),在某个时候 returns TRUE 来自 isEndOfStream.