Apache Flink Kafka 集成

Apache Flink Kafka Itegration

我正在尝试将 Apache Kafka 2.11-0.10.0.0 与 Apache Flink 1.1.2 集成。我正在使用 scala-shell 对其进行测试,但出现以下错误。

Class org.apache.flink.streaming.api.checkpoint.CheckpointNotifier 未找到

我已将 org.apache.flink.streaming jar 添加到 class 路径,但这没有帮助。我一直导入到 org.apache.flink.streaming.api.checkpoint._。那仍然无济于事。下面是我在 shell

中的 运行 代码
 import org.apache.flink.streaming.connectors.kafka._
 import org.apache.flink.streaming.util.serialization.SimpleStringSchema
 import org.apache.flink._
 import java.util._
 val properties = new Properties()
 properties.setProperty("bootstrap.servers", "localhost:9092")
 properties.setProperty("zookeeper.connect", "localhost:2181")
 properties.setProperty("group.id", "test")
 val myFetcher = FlinkKafkaConsumer.FetcherType.NEW_HIGH_LEVEL
 val myHandler = FlinkKafkaConsumer.OffsetStore.FLINK_ZOOKEEPER
 senv.addSource(new FlinkKafkaConsumer[String]("topic", new SimpleStringSchema(), properties, myHandler, myFetcher)).print

我也尝试过使用 FlinkKafkaConsumer081 和 FlinkKafkaConsumer082 方法而不是 FlinkKafkaConsumer 方法,但我仍然遇到相同的错误。

我确实注意到 flink-connector-kafka jar 自 2 月 16 日以来就没有更新过。这是不是用错了 jar?我确实在 maven central Flink Connector Kafka Base 2_11 中找到了。我应该改用那个罐子吗?

请帮忙!

您有版本冲突。我相信 class 在 Flink 1.0 中被删除了。所以你可能有一个旧版本的罐子。检查以确保所有内容都更新到 Flink 1.1.2