flink 的 Kafka 水印策略在我的应用程序中不起作用
flink's Kafka Watermark Strategies don't work in my application
我用的是flink 1.13.0版本
当我尝试通过似乎不起作用的 flink 文档使用 Kafka 水印策略时,window-process 函数将不会 运行。
而我想通过这种方式知道,水印的时间戳在kafka中使用的是消费时间还是生产时间?
我的消费者代码是这样的:
val source = new FlinkKafkaConsumer[String](topic, new SimpleStringSchema(), properties)
.setCommitOffsetsOnCheckpoints(true)
.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness[String](Duration.ofSeconds(5)))
stream = env.addSource(source)
.name(nodeCfg.nodeName)
.uid(nodeCfg.nodeName)
.setParallelism(nodeCfg.workerCount)
并像这样使用 window:
processStream
.keyBy(_.num)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.aggregate(new LatTimeAggregate(), new SignLatCalculateProcess())
.name(nodeCfg.nodeName)
.uid(nodeCfg.nodeName)
.setParallelism(nodeCfg.workerCount)
.addSink(new SignLatSink(serverConfig.smsRuleRedis))
.name("lat_count_sink")
.uid("lat_count_sink")
还有这样的拓扑图:
由于您没有在水印策略中指定时间戳分配器,因此您依赖 FlinkKafkaConsumer 为流记录分配时间戳。只有当从 Kafka 读取的记录在 headers 中有时间戳时,这才有效。否则,您将需要实施时间戳分配器以从事件中提取时间戳。
请注意,您将无法实现 FlinkKafkaConsumer 可以应用的时间戳分配器,除非您还实现了 FlinkKafkaConsumer 可以用来生成 objects 的反序列化器,然后可以提取时间戳。否则,您可以选择在源之后的某处应用水印策略。
如果缺少时间戳不是问题,则可能还有其他问题。例如,您可能有一个空闲的 Kafka 分区,或者缺少足够远的事件来关闭 window.
顺便说一句,如果您的事件是按 per-partition 顺序排列的,并且如果您在 FlinkKafkaConsumer 上调用了 assignTimestampsAndWatermarks(您目前正在这样做),那么您可以使用 forMonotonousTimestamps
而不是比 forBoundedOutOfOrderness
有一些明显的优势。
我用的是flink 1.13.0版本
当我尝试通过似乎不起作用的 flink 文档使用 Kafka 水印策略时,window-process 函数将不会 运行。
而我想通过这种方式知道,水印的时间戳在kafka中使用的是消费时间还是生产时间?
我的消费者代码是这样的:
val source = new FlinkKafkaConsumer[String](topic, new SimpleStringSchema(), properties)
.setCommitOffsetsOnCheckpoints(true)
.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness[String](Duration.ofSeconds(5)))
stream = env.addSource(source)
.name(nodeCfg.nodeName)
.uid(nodeCfg.nodeName)
.setParallelism(nodeCfg.workerCount)
并像这样使用 window:
processStream
.keyBy(_.num)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.aggregate(new LatTimeAggregate(), new SignLatCalculateProcess())
.name(nodeCfg.nodeName)
.uid(nodeCfg.nodeName)
.setParallelism(nodeCfg.workerCount)
.addSink(new SignLatSink(serverConfig.smsRuleRedis))
.name("lat_count_sink")
.uid("lat_count_sink")
还有这样的拓扑图:
由于您没有在水印策略中指定时间戳分配器,因此您依赖 FlinkKafkaConsumer 为流记录分配时间戳。只有当从 Kafka 读取的记录在 headers 中有时间戳时,这才有效。否则,您将需要实施时间戳分配器以从事件中提取时间戳。
请注意,您将无法实现 FlinkKafkaConsumer 可以应用的时间戳分配器,除非您还实现了 FlinkKafkaConsumer 可以用来生成 objects 的反序列化器,然后可以提取时间戳。否则,您可以选择在源之后的某处应用水印策略。
如果缺少时间戳不是问题,则可能还有其他问题。例如,您可能有一个空闲的 Kafka 分区,或者缺少足够远的事件来关闭 window.
顺便说一句,如果您的事件是按 per-partition 顺序排列的,并且如果您在 FlinkKafkaConsumer 上调用了 assignTimestampsAndWatermarks(您目前正在这样做),那么您可以使用 forMonotonousTimestamps
而不是比 forBoundedOutOfOrderness
有一些明显的优势。