Apache Flink Kafka 集成
Apache Flink Kafka Itegration
我正在尝试将 Apache Kafka 2.11-0.10.0.0 与 Apache Flink 1.1.2 集成。我正在使用 scala-shell 对其进行测试,但出现以下错误。
Class org.apache.flink.streaming.api.checkpoint.CheckpointNotifier 未找到
我已将 org.apache.flink.streaming jar 添加到 class 路径,但这没有帮助。我一直导入到 org.apache.flink.streaming.api.checkpoint._。那仍然无济于事。下面是我在 shell
中的 运行 代码
import org.apache.flink.streaming.connectors.kafka._
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.apache.flink._
import java.util._
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("zookeeper.connect", "localhost:2181")
properties.setProperty("group.id", "test")
val myFetcher = FlinkKafkaConsumer.FetcherType.NEW_HIGH_LEVEL
val myHandler = FlinkKafkaConsumer.OffsetStore.FLINK_ZOOKEEPER
senv.addSource(new FlinkKafkaConsumer[String]("topic", new SimpleStringSchema(), properties, myHandler, myFetcher)).print
我也尝试过使用 FlinkKafkaConsumer081 和 FlinkKafkaConsumer082 方法而不是 FlinkKafkaConsumer 方法,但我仍然遇到相同的错误。
我确实注意到 flink-connector-kafka jar 自 2 月 16 日以来就没有更新过。这是不是用错了 jar?我确实在 maven central Flink Connector Kafka Base 2_11 中找到了。我应该改用那个罐子吗?
请帮忙!
您有版本冲突。我相信 class 在 Flink 1.0 中被删除了。所以你可能有一个旧版本的罐子。检查以确保所有内容都更新到 Flink 1.1.2
我正在尝试将 Apache Kafka 2.11-0.10.0.0 与 Apache Flink 1.1.2 集成。我正在使用 scala-shell 对其进行测试,但出现以下错误。
Class org.apache.flink.streaming.api.checkpoint.CheckpointNotifier 未找到
我已将 org.apache.flink.streaming jar 添加到 class 路径,但这没有帮助。我一直导入到 org.apache.flink.streaming.api.checkpoint._。那仍然无济于事。下面是我在 shell
中的 运行 代码 import org.apache.flink.streaming.connectors.kafka._
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.apache.flink._
import java.util._
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("zookeeper.connect", "localhost:2181")
properties.setProperty("group.id", "test")
val myFetcher = FlinkKafkaConsumer.FetcherType.NEW_HIGH_LEVEL
val myHandler = FlinkKafkaConsumer.OffsetStore.FLINK_ZOOKEEPER
senv.addSource(new FlinkKafkaConsumer[String]("topic", new SimpleStringSchema(), properties, myHandler, myFetcher)).print
我也尝试过使用 FlinkKafkaConsumer081 和 FlinkKafkaConsumer082 方法而不是 FlinkKafkaConsumer 方法,但我仍然遇到相同的错误。
我确实注意到 flink-connector-kafka jar 自 2 月 16 日以来就没有更新过。这是不是用错了 jar?我确实在 maven central Flink Connector Kafka Base 2_11 中找到了。我应该改用那个罐子吗?
请帮忙!
您有版本冲突。我相信 class 在 Flink 1.0 中被删除了。所以你可能有一个旧版本的罐子。检查以确保所有内容都更新到 Flink 1.1.2