从 FlinkKafkaConsumer 迁移到 KafkaSource,没有执行 windows
Migrating from FlinkKafkaConsumer to KafkaSource, no windows executed
我是kafka和flink初学者。
我已经实现 FlinkKafkaConsumer
来消费来自 kafka-topic 的消息。除了“组”和“主题”之外,唯一的自定义设置是 (ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
以允许多次重新阅读相同的消息。它开箱即用,适用于消费和逻辑。
现在 FlinkKafkaConsumer
被弃用了,我想换成继任者 KafkaSource
。
使用与我相同的参数初始化 KafkaSource
FlinkKafkaConsumer
会按预期读取主题,我可以通过打印流来验证这一点。反序列化和时间戳似乎工作正常。但是 windows 的执行没有完成,因此不会产生任何结果。
我假设 KafkaSource
中的一些默认设置与 FlinkKafkaConsumer
中的不同,但我不知道它们可能是什么。
KafkaSource - 不工作
KafkaSource<TestData> source = KafkaSource.<TestData>builder()
.setBootstrapServers(propertiesForKafka.getProperty("bootstrap.servers"))
.setTopics(TOPIC)
.setDeserializer(new CustomDeserializer())
.setGroupId(GROUP_ID)
.setStartingOffsets(OffsetsInitializer.earliest())
.build();
DataStream<TestData> testDataStreamSource = env.fromSource(
source,
WatermarkStrategy.
<TestData>noWatermarks(),
"Kafka Source"
);
Kafka 消费者 - Working(属性包含 group.id
、bootstrap.servers
和 zookeeper.connect
)
propertiesForKafka.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
FlinkKafkaConsumer<TestData> flinkKafkaConsumer = new FlinkKafkaConsumer(TOPIC, new CustomDeserializer(), propertiesForKafka);
DataStreamSource<TestData> testDataStreamSource = env.addSource(flinkKafkaConsumer)
两个流都使用相同的管道,如下所示
testDataStreamSource
.assignTimestampsAndWatermarks(WatermarkStrategy.
<TestData>forMonotonousTimestamps().
withTimestampAssigner((event, timestamp) -> event.getTimestamp()))
.keyBy(TestData::getKey)
.window(SlidingEventTimeWindows.of(Time.hours(3), Time.hours(1)))
.process(new ProcessWindowFunction<TestData, TestDataOutput, String, TimeWindow>() {
@Override
public void process(
....
});
尝试过的东西
- 我尝试过设置偏移量提交,但它
情况没有改善。
- 正在设置源中已有的时间戳。
更新:答案是,在 Kafka 分区数小于 Flink 的 kafka 源运算符的并行度的情况下,KafkaSource 的行为与 FlinkKafkaConsumer 不同。有关详细信息,请参阅 。
原回答:
问题几乎可以肯定与时间戳和水印有关。
要验证时间戳和水印是问题所在,您可以做一个快速实验,将 3 小时长的事件时间滑动 windows 替换为较短的处理时间滚动 windows。
一般来说,最好(但不是必需)让 KafkaSource 做水印。在源之后应用的水印生成器中使用 forMonotonousTimestamps
,就像您现在所做的那样,是一个冒险的举动。只有当源的每个并行实例使用的所有分区中的时间戳按顺序处理时,这才会正常工作。如果将多个 Kafka 分区分配给任何 KafkaSource 任务,则不会发生这种情况。另一方面,如果您在 fromSource 调用中提供 forMonotonousTimestamps
水印策略(而不是 noWatermarks
),那么所需要的只是时间戳在每个分区的基础上按顺序排列,我想是这样的。
尽管如此令人不安,但可能不足以解释为什么 windows 没有产生任何结果。另一个可能的根本原因是测试数据集不包含任何在第一个 window 之后带有时间戳的事件,因此 window 永远不会关闭。
你有水槽吗?如果不是,那就可以解释事情了。
您可以使用 Flink 仪表板来帮助调试它。查看水印是否在 window 任务中推进。打开检查点,然后查看 window 任务有多少状态——它应该有一些非零状态。
我是kafka和flink初学者。
我已经实现 FlinkKafkaConsumer
来消费来自 kafka-topic 的消息。除了“组”和“主题”之外,唯一的自定义设置是 (ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
以允许多次重新阅读相同的消息。它开箱即用,适用于消费和逻辑。
现在 FlinkKafkaConsumer
被弃用了,我想换成继任者 KafkaSource
。
使用与我相同的参数初始化 KafkaSource
FlinkKafkaConsumer
会按预期读取主题,我可以通过打印流来验证这一点。反序列化和时间戳似乎工作正常。但是 windows 的执行没有完成,因此不会产生任何结果。
我假设 KafkaSource
中的一些默认设置与 FlinkKafkaConsumer
中的不同,但我不知道它们可能是什么。
KafkaSource - 不工作
KafkaSource<TestData> source = KafkaSource.<TestData>builder()
.setBootstrapServers(propertiesForKafka.getProperty("bootstrap.servers"))
.setTopics(TOPIC)
.setDeserializer(new CustomDeserializer())
.setGroupId(GROUP_ID)
.setStartingOffsets(OffsetsInitializer.earliest())
.build();
DataStream<TestData> testDataStreamSource = env.fromSource(
source,
WatermarkStrategy.
<TestData>noWatermarks(),
"Kafka Source"
);
Kafka 消费者 - Working(属性包含 group.id
、bootstrap.servers
和 zookeeper.connect
)
propertiesForKafka.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
FlinkKafkaConsumer<TestData> flinkKafkaConsumer = new FlinkKafkaConsumer(TOPIC, new CustomDeserializer(), propertiesForKafka);
DataStreamSource<TestData> testDataStreamSource = env.addSource(flinkKafkaConsumer)
两个流都使用相同的管道,如下所示
testDataStreamSource
.assignTimestampsAndWatermarks(WatermarkStrategy.
<TestData>forMonotonousTimestamps().
withTimestampAssigner((event, timestamp) -> event.getTimestamp()))
.keyBy(TestData::getKey)
.window(SlidingEventTimeWindows.of(Time.hours(3), Time.hours(1)))
.process(new ProcessWindowFunction<TestData, TestDataOutput, String, TimeWindow>() {
@Override
public void process(
....
});
尝试过的东西
- 我尝试过设置偏移量提交,但它 情况没有改善。
- 正在设置源中已有的时间戳。
更新:答案是,在 Kafka 分区数小于 Flink 的 kafka 源运算符的并行度的情况下,KafkaSource 的行为与 FlinkKafkaConsumer 不同。有关详细信息,请参阅
原回答:
问题几乎可以肯定与时间戳和水印有关。
要验证时间戳和水印是问题所在,您可以做一个快速实验,将 3 小时长的事件时间滑动 windows 替换为较短的处理时间滚动 windows。
一般来说,最好(但不是必需)让 KafkaSource 做水印。在源之后应用的水印生成器中使用 forMonotonousTimestamps
,就像您现在所做的那样,是一个冒险的举动。只有当源的每个并行实例使用的所有分区中的时间戳按顺序处理时,这才会正常工作。如果将多个 Kafka 分区分配给任何 KafkaSource 任务,则不会发生这种情况。另一方面,如果您在 fromSource 调用中提供 forMonotonousTimestamps
水印策略(而不是 noWatermarks
),那么所需要的只是时间戳在每个分区的基础上按顺序排列,我想是这样的。
尽管如此令人不安,但可能不足以解释为什么 windows 没有产生任何结果。另一个可能的根本原因是测试数据集不包含任何在第一个 window 之后带有时间戳的事件,因此 window 永远不会关闭。
你有水槽吗?如果不是,那就可以解释事情了。
您可以使用 Flink 仪表板来帮助调试它。查看水印是否在 window 任务中推进。打开检查点,然后查看 window 任务有多少状态——它应该有一些非零状态。