水印策略不适用于 Flink 中的 Kafka Consumer

Watermark strategy not working for Kafka Consumer in Flink

我是Flink的新手,所以在Flink中定义水印时遇到了一些问题。

让我们从 Kafka 消费者开始。使用的反序列化是JSONKeyValueDeserializationSchema,所以没有自定义解析。

val kafkaConsumer: FlinkKafkaConsumer[ObjectNode] = new FlinkKafkaConsumer[ObjectNode](
  kafkaTopic,
  new JSONKeyValueDeserializationSchema(false),
  properties
)

如果将接收器应用到此代码,它就可以正常工作。问题是需要水印来避免乱序事件。这就是我写的策略:

 val watermarkStrategy: WatermarkStrategy[ObjectNode] = WatermarkStrategy
  .forBoundedOutOfOrderness[ObjectNode](Duration.ofSeconds(100))
  .withTimestampAssigner(
    new SerializableTimestampAssigner[ObjectNode] {
      override def extractTimestamp(record: ObjectNode, recordTimestamp: Long): Long = {
        Instant.parse(record.get("value").get("content").get("timestamp").asText()).getEpochSecond
      }
    })

经过一些研究,我最终得到了这段代码,但这行不通。这些是我的问题:

问题出在我的解析上。

dateFormat.parse(record.get("value").get("content").get("timestamp").asText()).getTime