使用有界 Kafka 数据源时,如何在 Flink 中测量直通输出和延迟?
How to measure through-output and latency in Flink when using a bounded Kafka data source?
我有一个 Flink 应用程序,想评估应用程序的性能。
我的计划是在实验中使用有界历史数据集作为数据源,通过测量总时间成本,我可以得到直通输出值。通过查询延迟指标,我可以获得延迟情况。
但是,当使用Kafka作为数据源时,在消耗了特定数量的数据后,应用程序似乎还在等待其他数据被消耗而没有转向完成状态。我认为在等待的过程中,through输出值和延迟可能会变低,这与应用程序的性能无关。
在这种情况下,如何获得与我的有界数据集相关的精确时间成本和延迟条件,并避免来自额外等待阶段的影响?
如果您使用 KafkaSource
而不是 FlinkKafkaConsumer
,您可以使用 setBounded
指定结束偏移量,如
KafkaSource<String> source = KafkaSource
.<String>builder()
.setBootstrapServers(...)
.setGroupId(...)
.setTopics(...)
.setDeserializer(...)
.setStartingOffsets(OffsetsInitializer.earliest())
.setBounded(OffsetsInitializer.latest())
.build();
env.fromSource(source, watermarkStrategy, "name"));
这样,使用 Kafka 作为源的有界作业将干净地结束。
如果你需要用 FlinkKafkaConsumer
完成同样的事情,那么实现 DeserializationSchema
(或 KafkaDeserializationSchema
),在某个时候 returns TRUE 来自 isEndOfStream
.
我有一个 Flink 应用程序,想评估应用程序的性能。
我的计划是在实验中使用有界历史数据集作为数据源,通过测量总时间成本,我可以得到直通输出值。通过查询延迟指标,我可以获得延迟情况。
但是,当使用Kafka作为数据源时,在消耗了特定数量的数据后,应用程序似乎还在等待其他数据被消耗而没有转向完成状态。我认为在等待的过程中,through输出值和延迟可能会变低,这与应用程序的性能无关。
在这种情况下,如何获得与我的有界数据集相关的精确时间成本和延迟条件,并避免来自额外等待阶段的影响?
如果您使用 KafkaSource
而不是 FlinkKafkaConsumer
,您可以使用 setBounded
指定结束偏移量,如
KafkaSource<String> source = KafkaSource
.<String>builder()
.setBootstrapServers(...)
.setGroupId(...)
.setTopics(...)
.setDeserializer(...)
.setStartingOffsets(OffsetsInitializer.earliest())
.setBounded(OffsetsInitializer.latest())
.build();
env.fromSource(source, watermarkStrategy, "name"));
这样,使用 Kafka 作为源的有界作业将干净地结束。
如果你需要用 FlinkKafkaConsumer
完成同样的事情,那么实现 DeserializationSchema
(或 KafkaDeserializationSchema
),在某个时候 returns TRUE 来自 isEndOfStream
.