flink 的 Kafka 水印策略在我的应用程序中不起作用

flink's Kafka Watermark Strategies don't work in my application

我用的是flink 1.13.0版本

当我尝试通过似乎不起作用的 flink 文档使用 Kafka 水印策略时,window-process 函数将不会 运行。

而我想通过这种方式知道,水印的时间戳在kafka中使用的是消费时间还是生产时间?

我的消费者代码是这样的:

val source = new FlinkKafkaConsumer[String](topic, new SimpleStringSchema(), properties)
  .setCommitOffsetsOnCheckpoints(true)
  .assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness[String](Duration.ofSeconds(5)))
stream = env.addSource(source)
  .name(nodeCfg.nodeName)
  .uid(nodeCfg.nodeName)
  .setParallelism(nodeCfg.workerCount)

并像这样使用 window:

processStream
  .keyBy(_.num)
  .window(TumblingEventTimeWindows.of(Time.seconds(5)))
  .aggregate(new LatTimeAggregate(), new SignLatCalculateProcess())
  .name(nodeCfg.nodeName)
  .uid(nodeCfg.nodeName)
  .setParallelism(nodeCfg.workerCount)
  .addSink(new SignLatSink(serverConfig.smsRuleRedis))
  .name("lat_count_sink")
  .uid("lat_count_sink")

还有这样的拓扑图:

由于您没有在水印策略中指定时间戳分配器,因此您依赖 FlinkKafkaConsumer 为流记录分配时间戳。只有当从 Kafka 读取的记录在 headers 中有时间戳时,这才有效。否则,您将需要实施时间戳分配器以从事件中提取时间戳。

请注意,您将无法实现 FlinkKafkaConsumer 可以应用的时间戳分配器,除非您还实现了 FlinkKafkaConsumer 可以用来生成 objects 的反序列化器,然后可以提取时间戳。否则,您可以选择在源之后的某处应用水印策略。

如果缺少时间戳不是问题,则可能还有其他问题。例如,您可能有一个空闲的 Kafka 分区,或者缺少足够远的事件来关闭 window.

顺便说一句,如果您的事件是按 per-partition 顺序排列的,并且如果您在 FlinkKafkaConsumer 上调用了 assignTimestampsAndWatermarks(您目前正在这样做),那么您可以使用 forMonotonousTimestamps 而不是比 forBoundedOutOfOrderness 有一些明显的优势。