从 FlinkKafkaConsumer 迁移到 KafkaSource,没有执行 windows

Migrating from FlinkKafkaConsumer to KafkaSource, no windows executed

我是kafka和flink初学者。 我已经实现 FlinkKafkaConsumer 来消费来自 kafka-topic 的消息。除了“组”和“主题”之外,唯一的自定义设置是 (ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") 以允许多次重新阅读相同的消息。它开箱即用,适用于消费和逻辑。 现在 FlinkKafkaConsumer 被弃用了,我想换成继任者 KafkaSource

使用与我相同的参数初始化 KafkaSource FlinkKafkaConsumer 会按预期读取主题,我可以通过打印流来验证这一点。反序列化和时间戳似乎工作正常。但是 windows 的执行没有完成,因此不会产生任何结果。

我假设 KafkaSource 中的一些默认设置与 FlinkKafkaConsumer 中的不同,但我不知道它们可能是什么。

KafkaSource - 不工作

 KafkaSource<TestData> source = KafkaSource.<TestData>builder()
                .setBootstrapServers(propertiesForKafka.getProperty("bootstrap.servers"))
                .setTopics(TOPIC)
                .setDeserializer(new CustomDeserializer())
                .setGroupId(GROUP_ID)
                .setStartingOffsets(OffsetsInitializer.earliest())
                .build();

        DataStream<TestData> testDataStreamSource = env.fromSource(
                source,
                WatermarkStrategy.
                        <TestData>noWatermarks(),
                "Kafka Source"
        );

Kafka 消费者 - Working(属性包含 group.idbootstrap.serverszookeeper.connect

    propertiesForKafka.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    FlinkKafkaConsumer<TestData> flinkKafkaConsumer = new  FlinkKafkaConsumer(TOPIC, new CustomDeserializer(), propertiesForKafka);
    DataStreamSource<TestData> testDataStreamSource = env.addSource(flinkKafkaConsumer)

两个流都使用相同的管道,如下所示

testDataStreamSource
           .assignTimestampsAndWatermarks(WatermarkStrategy.
                        <TestData>forMonotonousTimestamps().
                        withTimestampAssigner((event, timestamp) -> event.getTimestamp()))
            .keyBy(TestData::getKey)
            .window(SlidingEventTimeWindows.of(Time.hours(3), Time.hours(1)))
            .process(new ProcessWindowFunction<TestData, TestDataOutput, String, TimeWindow>() {
                    @Override
                    public void process(
            ....
            });

尝试过的东西

更新:答案是,在 Kafka 分区数小于 Flink 的 kafka 源运算符的并行度的情况下,KafkaSource 的行为与 FlinkKafkaConsumer 不同。有关详细信息,请参阅

原回答:

问题几乎可以肯定与时间戳和水印有关。

要验证时间戳和水印是问题所在,您可以做一个快速实验,将 3 小时长的事件时间滑动 windows 替换为较短的处理时间滚动 windows。

一般来说,最好(但不是必需)让 KafkaSource 做水印。在源之后应用的水印生成器中使用 forMonotonousTimestamps,就像您现在所做的那样,是一个冒险的举动。只有当源的每个并行实例使用的所有分区中的时间戳按顺序处理时,这才会正常工作。如果将多个 Kafka 分区分配给任何 KafkaSource 任务,则不会发生这种情况。另一方面,如果您在 fromSource 调用中提供 forMonotonousTimestamps 水印策略(而不是 noWatermarks),那么所需要的只是时间戳在每个分区的基础上按顺序排列,我想是这样的。

尽管如此令人不安,但可能不足以解释为什么 windows 没有产生任何结果。另一个可能的根本原因是测试数据集不包含任何在第一个 window 之后带有时间戳的事件,因此 window 永远不会关闭。

你有水槽吗?如果不是,那就可以解释事情了。

您可以使用 Flink 仪表板来帮助调试它。查看水印是否在 window 任务中推进。打开检查点,然后查看 window 任务有多少状态——它应该有一些非零状态。