Scala:无法解析重载方法(Flink WatermarkStrategy)
Scala: Cannot resolve overloaded methods (Flink WatermarkStrategy)
我正在关注 Flink 关于如何将 WatermarkStrategy 与 KafkaConsumer 结合使用的文档。代码如下
val kafkaSource = new FlinkKafkaConsumer[MyType]("myTopic", schema, props)
kafkaSource.assignTimestampsAndWatermarks(
WatermarkStrategy
.forBoundedOutOfOrderness(Duration.ofSeconds(20)))
val stream: DataStream[MyType] = env.addSource(kafkaSource)
每当我尝试编译上面的代码时,我都会收到一条错误消息
错误:重载方法值 assignTimestampsAndWatermarks 与备选方案:
error: overloaded method value assignTimestampsAndWatermarks with alternatives:
[ERROR] (x: org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks[String])org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase[String] <and>
[ERROR] (x: org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks[String])org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase[String] <and>
[ERROR] (x: org.apache.flink.api.common.eventtime.WatermarkStrategy[String])org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase[String]
[ERROR] cannot be applied to (org.apache.flink.api.common.eventtime.WatermarkStrategy[Nothing])
[ERROR] consumer.assignTimestampsAndWatermarks(
下面的代码 returns WatermarkStrategyy[Nothing] 而不是 WatermarkStrategy[String]
WatermarkStrategy
.forBoundedOutOfOrderness(Duration.ofSeconds(20)))
我用这段代码解决了这个问题
val kafkaSource = new FlinkKafkaConsumer[MyType]("myTopic", schema, props)
watermark: Watermark[String] = WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(20))
kafkaSource.assignTimestampsAndWatermarks(watermark)
@Mayokun 说的对。但是为了使代码更简单,您可以将类型信息放在静态方法之后:
val kafkaSource = new FlinkKafkaConsumer[MyType]("myTopic", schema, props)
kafkaSource.assignTimestampsAndWatermarks(
WatermarkStrategy.forBoundedOutOfOrderness[MyType](Duration.ofSeconds(20))
)
我正在关注 Flink 关于如何将 WatermarkStrategy 与 KafkaConsumer 结合使用的文档。代码如下
val kafkaSource = new FlinkKafkaConsumer[MyType]("myTopic", schema, props)
kafkaSource.assignTimestampsAndWatermarks(
WatermarkStrategy
.forBoundedOutOfOrderness(Duration.ofSeconds(20)))
val stream: DataStream[MyType] = env.addSource(kafkaSource)
每当我尝试编译上面的代码时,我都会收到一条错误消息
错误:重载方法值 assignTimestampsAndWatermarks 与备选方案:
error: overloaded method value assignTimestampsAndWatermarks with alternatives:
[ERROR] (x: org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks[String])org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase[String] <and>
[ERROR] (x: org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks[String])org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase[String] <and>
[ERROR] (x: org.apache.flink.api.common.eventtime.WatermarkStrategy[String])org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase[String]
[ERROR] cannot be applied to (org.apache.flink.api.common.eventtime.WatermarkStrategy[Nothing])
[ERROR] consumer.assignTimestampsAndWatermarks(
下面的代码 returns WatermarkStrategyy[Nothing] 而不是 WatermarkStrategy[String]
WatermarkStrategy
.forBoundedOutOfOrderness(Duration.ofSeconds(20)))
我用这段代码解决了这个问题
val kafkaSource = new FlinkKafkaConsumer[MyType]("myTopic", schema, props)
watermark: Watermark[String] = WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(20))
kafkaSource.assignTimestampsAndWatermarks(watermark)
@Mayokun 说的对。但是为了使代码更简单,您可以将类型信息放在静态方法之后:
val kafkaSource = new FlinkKafkaConsumer[MyType]("myTopic", schema, props)
kafkaSource.assignTimestampsAndWatermarks(
WatermarkStrategy.forBoundedOutOfOrderness[MyType](Duration.ofSeconds(20))
)