水印策略不适用于 Flink 中的 Kafka Consumer
Watermark strategy not working for Kafka Consumer in Flink
我是Flink的新手,所以在Flink中定义水印时遇到了一些问题。
让我们从 Kafka 消费者开始。使用的反序列化是JSONKeyValueDeserializationSchema,所以没有自定义解析。
val kafkaConsumer: FlinkKafkaConsumer[ObjectNode] = new FlinkKafkaConsumer[ObjectNode](
kafkaTopic,
new JSONKeyValueDeserializationSchema(false),
properties
)
如果将接收器应用到此代码,它就可以正常工作。问题是需要水印来避免乱序事件。这就是我写的策略:
val watermarkStrategy: WatermarkStrategy[ObjectNode] = WatermarkStrategy
.forBoundedOutOfOrderness[ObjectNode](Duration.ofSeconds(100))
.withTimestampAssigner(
new SerializableTimestampAssigner[ObjectNode] {
override def extractTimestamp(record: ObjectNode, recordTimestamp: Long): Long = {
Instant.parse(record.get("value").get("content").get("timestamp").asText()).getEpochSecond
}
})
经过一些研究,我最终得到了这段代码,但这行不通。这些是我的问题:
- 在这里使用 ObjectNode 是最佳选择?还有其他选择吗?
- 字段timestamp是ISO 8601标准中的字符串,所以我要解析成long。这是最好的方法吗?有没有更好的办法?
- SerializableTimestampAssigner 和 forBoundedOutOfOrderness 是否正确使用?
问题出在我的解析上。
dateFormat.parse(record.get("value").get("content").get("timestamp").asText()).getTime
我是Flink的新手,所以在Flink中定义水印时遇到了一些问题。
让我们从 Kafka 消费者开始。使用的反序列化是JSONKeyValueDeserializationSchema,所以没有自定义解析。
val kafkaConsumer: FlinkKafkaConsumer[ObjectNode] = new FlinkKafkaConsumer[ObjectNode](
kafkaTopic,
new JSONKeyValueDeserializationSchema(false),
properties
)
如果将接收器应用到此代码,它就可以正常工作。问题是需要水印来避免乱序事件。这就是我写的策略:
val watermarkStrategy: WatermarkStrategy[ObjectNode] = WatermarkStrategy
.forBoundedOutOfOrderness[ObjectNode](Duration.ofSeconds(100))
.withTimestampAssigner(
new SerializableTimestampAssigner[ObjectNode] {
override def extractTimestamp(record: ObjectNode, recordTimestamp: Long): Long = {
Instant.parse(record.get("value").get("content").get("timestamp").asText()).getEpochSecond
}
})
经过一些研究,我最终得到了这段代码,但这行不通。这些是我的问题:
- 在这里使用 ObjectNode 是最佳选择?还有其他选择吗?
- 字段timestamp是ISO 8601标准中的字符串,所以我要解析成long。这是最好的方法吗?有没有更好的办法?
- SerializableTimestampAssigner 和 forBoundedOutOfOrderness 是否正确使用?
问题出在我的解析上。
dateFormat.parse(record.get("value").get("content").get("timestamp").asText()).getTime