Local Kafka Application failing with: NoSuchMethodError: createEphemeral

Local Kafka Application failing with: NoSuchMethodError: createEphemeral

我正在尝试 运行 我的 spark 应用程序处于本地模式。为了全部设置,我遵循了本教程:http://blog.d2-si.fr/2015/11/05/apache-kafka-3/,(在 法语) 展示了构建本地 kafka/zookeeper 环境的每个步骤。

此外,我使用 IntelliJ 具有以下配置:

val sparkConf = new SparkConf().setAppName("zumbaApp").setMaster("local[2]")

还有我的 运行 配置,供消费者使用:

"127.0.0.1:2181" "zumbaApp-gpId" "D2SI" "1"

制作人:

"127.0.0.1:9092" "D2SI" "my\Input\File.csv" 300

之前,我检查了消费者是否收到了生产者的输入,默认的 console-producerconsole-consumerkafka_2.10-0.9.0.1 ;确实如此。

但是,我遇到了以下错误:

java.lang.NoSuchMethodError: org.I0Itec.zkclient.ZkClient.createEphemeral(Ljava/lang/String;Ljava/lang/Object;Ljava/util/List;)V
at kafka.utils.ZkPath$.createEphemeral(ZkUtils.scala:921)
at kafka.utils.ZkUtils.createEphemeralPath(ZkUtils.scala:348)
at kafka.utils.ZkUtils.createEphemeralPathExpectConflict(ZkUtils.scala:363)
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun.apply(ZookeeperConsumerConnector.scala:839)
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun.apply(ZookeeperConsumerConnector.scala:833)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:244)
at scala.collection.mutable.HashMap$$anonfun$foreach.apply(HashMap.scala:98)
at scala.collection.mutable.HashMap$$anonfun$foreach.apply(HashMap.scala:98)
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.reflectPartitionOwnershipDecision(ZookeeperConsumerConnector.scala:833)
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala:721)
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$$anonfun$apply$mcV$sp.apply$mcVI$sp(ZookeeperConsumerConnector.scala:636)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance.apply$mcV$sp(ZookeeperConsumerConnector.scala:627)
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance.apply(ZookeeperConsumerConnector.scala:627)
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance.apply(ZookeeperConsumerConnector.scala:627)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:626)
at kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:967)
at kafka.consumer.ZookeeperConsumerConnector.consume(ZookeeperConsumerConnector.scala:254)
at kafka.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:156)
at org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:111)
at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:148)
at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:130)
at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun.apply(ReceiverTracker.scala:575)
at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun.apply(ReceiverTracker.scala:565)
at org.apache.spark.SparkContext$$anonfun.apply(SparkContext.scala:1992)
at org.apache.spark.SparkContext$$anonfun.apply(SparkContext.scala:1992)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

我没有成功解决这个问题。我认为这是一个 zookeeper-config 错误,但在与另一台机器上具有相同配置文件的应用程序的工作版本进行比较后,它似乎不再是了。

看起来你在这里有依赖性问题。

检查类路径中库 com.101tec.zkclient 的版本。 Kafka 需要版本 0.7

此外,自 kafka_2.10-0.9.0.1 起,生产者和消费者的 API 不再使用 zookeeper。在您的案例中,Spark-streaming 似乎使用了 0.8 版本的 Kafka。