CDH 5.8.3 中的 Spark Streaming Kafka 以 yarn-cluster 模式集成

Spark Streaming Kafka integration in CDH 5.8.3 in yarn-cluster mode

我在 运行 从 Kafka 读取 Spark Streaming 作业时遇到了一个奇怪的问题。我使用的是 CDH 5.8.3 发行版:Spark 版本是 1.6.0,Kafka 版本是 0.9.0。

我的代码很简单:

val kafkaParams = Map[String, String]("bootstrap.servers" -> brokersList, "auto.offset.reset" -> "smallest")
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, Set(kafkaTopic))

如果我 运行 它处于 yarn-client 模式,我没有错误。如果我 运行 程序处于 yarn-cluster 模式,我会得到一个异常。我的启动命令是:

spark-submit --master yarn-cluster --files /etc/hbase/conf/hbase-site.xml --num-executors 5 --executor-memory 4G --jars (somejars for HBase interaction) --class mypackage.MyClass myJar.jar

但是我收到了这个错误:

java.lang.ClassCastException: kafka.cluster.Broker cannot be cast to kafka.cluster.BrokerEndPoint
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$$anonfun$$anonfun$apply$$anonfun$apply.apply(KafkaCluster.scala:90)
at scala.Option.map(Option.scala:145)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$$anonfun$$anonfun$apply.apply(KafkaCluster.scala:90)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$$anonfun$$anonfun$apply.apply(KafkaCluster.scala:87)
at scala.collection.TraversableLike$$anonfun$flatMap.apply(TraversableLike.scala:251)
at scala.collection.TraversableLike$$anonfun$flatMap.apply(TraversableLike.scala:251)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$$anonfun.apply(KafkaCluster.scala:87)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$$anonfun.apply(KafkaCluster.scala:86)
at scala.collection.TraversableLike$$anonfun$flatMap.apply(TraversableLike.scala:251)
at scala.collection.TraversableLike$$anonfun$flatMap.apply(TraversableLike.scala:251)
at scala.collection.immutable.Set$Set1.foreach(Set.scala:74)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun.apply(KafkaCluster.scala:86)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun.apply(KafkaCluster.scala:85)
at scala.util.Either$RightProjection.flatMap(Either.scala:523)
at org.apache.spark.streaming.kafka.KafkaCluster.findLeaders(KafkaCluster.scala:85)
at org.apache.spark.streaming.kafka.KafkaCluster.getLeaderOffsets(KafkaCluster.scala:179)
at org.apache.spark.streaming.kafka.KafkaCluster.getLeaderOffsets(KafkaCluster.scala:161)
at org.apache.spark.streaming.kafka.KafkaCluster.getEarliestLeaderOffsets(KafkaCluster.scala:155)
at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun.apply(KafkaUtils.scala:213)
at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun.apply(KafkaUtils.scala:211)
at scala.util.Either$RightProjection.flatMap(Either.scala:523)
at org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:211)
at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
at myPackage.Ingestion$.createStreamingContext(Ingestion.scala:120)
at myPackage.Ingestion$$anonfun.apply(Ingestion.scala:55)
at myPackage.Ingestion$$anonfun.apply(Ingestion.scala:55)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.streaming.StreamingContext$.getOrCreate(StreamingContext.scala:864)
at myPackage.Ingestion$.main(Ingestion.scala:55)
at myPackage.Ingestion.main(Ingestion.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon.run(ApplicationMaster.scala:542)

网上冲浪我最终认为这是一个版本问题,但我无法弄清楚为什么会发生这种情况,因为 jar 在 yarn-client 和 yarn-cluster 模式下是相同的 运行ning .

你有什么想法吗?

谢谢, 马可

看起来 Spark streaming 1.6 与 Kafka 0.8 兼容(参见 documentation

我猜你使用的是 Kafka 客户端 0.9,它是从你的 jar 中以 client 模式获取的,但是当你切换到 cluster mode default Kafka client (0.8.2.1) is used.

我说的对吗?如果是这样,您可以尝试从构建中删除 kafka 客户端依赖项并使用 spark-streaming-kafka 提供的默认客户端吗? (0.8 客户应与 0.9 经纪人合作)。

对于那些可能遇到同样问题的人,我们的问题是由于拼接机安装引起的。事实上,Splice Machine 需要在 YARN 额外的 jars 配置中设置它的 jars(其中还有一个 spark-assembly by them)。

现在我们正试图找到一种方法来制作所有东西 运行 而无需从我们的集群中使用 unisntall Splice Machine。

谢谢。