Spark Streaming 应用程序因 KafkaException: String exceeds the maximum size 或 IllegalArgumentException 而失败
Spark Streaming application fails with KafkaException: String exceeds the maximum size or with IllegalArgumentException
长话短说:
我非常简单的 Spark Streaming 应用程序在驱动程序中失败并显示 "KafkaException: String exceeds the maximum size"。我在执行程序中看到了相同的异常,但我还在执行程序日志的某处发现了一个 IllegalArgumentException,其中没有其他信息
全部问题:
我正在使用 Spark Streaming 从 Kafka 主题中读取一些消息。
这就是我正在做的事情:
val conf = new SparkConf().setAppName("testName")
val streamingContext = new StreamingContext(new SparkContext(conf), Milliseconds(millis))
val kafkaParams = Map(
"metadata.broker.list" -> "somevalidaddresshere:9092",
"auto.offset.reset" -> "largest"
)
val topics = Set("data")
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
streamingContext,
kafkaParams,
topics
).map(_._2) // only need the values not the keys
我对 Kafka 数据所做的只是使用以下方法打印它:
stream.print()
我的应用程序显然有比这更多的代码,但为了找到我的问题,我从代码中删除了所有可能的代码
我正在尝试 运行 YARN 上的此代码。
这是我的 spark 提交行:
./spark-submit --class com.somecompany.stream.MainStream --master yarn --deploy-mode cluster myjar.jar hdfs://some.hdfs.address.here/user/spark/streamconfig.properties
streamconfig.properties 文件只是一个常规属性文件,可能与这里的问题无关
在尝试执行应用程序后,它很快就失败了,驱动程序出现以下异常:
16/05/10 06:15:38 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, some.hdfs.address.here): kafka.common.KafkaException: String exceeds the maximum size of 32767.
at kafka.api.ApiUtils$.shortStringLength(ApiUtils.scala:73)
at kafka.api.TopicData$.headerSize(FetchResponse.scala:107)
at kafka.api.TopicData.<init>(FetchResponse.scala:113)
at kafka.api.TopicData$.readFrom(FetchResponse.scala:103)
at kafka.api.FetchResponse$$anonfun.apply(FetchResponse.scala:170)
at kafka.api.FetchResponse$$anonfun.apply(FetchResponse.scala:169)
at scala.collection.TraversableLike$$anonfun$flatMap.apply(TraversableLike.scala:251)
at scala.collection.TraversableLike$$anonfun$flatMap.apply(TraversableLike.scala:251)
at scala.collection.immutable.Range.foreach(Range.scala:141)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
at kafka.api.FetchResponse$.readFrom(FetchResponse.scala:169)
at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:135)
at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:192)
at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:208)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:308)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.rdd.RDD$$anonfun$take$$anonfun.apply(RDD.scala:1328)
at org.apache.spark.rdd.RDD$$anonfun$take$$anonfun.apply(RDD.scala:1328)
at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:1869)
at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:1869)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
我什至没有在堆栈跟踪中看到我的代码
检查执行程序,我发现了与驱动程序中相同的异常,但也深埋在以下异常中:
16/05/10 06:40:47 ERROR executor.Executor: Exception in task 0.0 in stage 2.0 (TID 8)
java.lang.IllegalArgumentException
at java.nio.Buffer.limit(Buffer.java:275)
at kafka.api.FetchResponsePartitionData$.readFrom(FetchResponse.scala:38)
at kafka.api.TopicData$$anonfun.apply(FetchResponse.scala:100)
at kafka.api.TopicData$$anonfun.apply(FetchResponse.scala:98)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:244)
at scala.collection.immutable.Range.foreach(Range.scala:141)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at kafka.api.TopicData$.readFrom(FetchResponse.scala:98)
at kafka.api.FetchResponse$$anonfun.apply(FetchResponse.scala:170)
at kafka.api.FetchResponse$$anonfun.apply(FetchResponse.scala:169)
at scala.collection.TraversableLike$$anonfun$flatMap.apply(TraversableLike.scala:251)
at scala.collection.TraversableLike$$anonfun$flatMap.apply(TraversableLike.scala:251)
at scala.collection.immutable.Range.foreach(Range.scala:141)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
at kafka.api.FetchResponse$.readFrom(FetchResponse.scala:169)
at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:135)
at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:192)
at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:208)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:308)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.rdd.RDD$$anonfun$take$$anonfun.apply(RDD.scala:1328)
at org.apache.spark.rdd.RDD$$anonfun$take$$anonfun.apply(RDD.scala:1328)
at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:1869)
at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:1869)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
我不知道 IllegalArgument 是什么,因为没有包含任何信息
我的 YARN 使用的 Spark 版本是 1.6.0。我还验证了我的 pom 包含 Spark 1.6.0 而不是早期版本。我的范围是 "provided"
我从完全相同的主题手动读取数据,那里的数据只是普通的 JSON。那里的数据根本不是很大。绝对小于 32767。而且我能够使用常规命令行使用者读取此数据,这很奇怪
谷歌搜索这个异常很遗憾没有提供任何有用的信息
有没有人知道如何理解这里的问题到底是什么?
提前致谢
经过大量挖掘,我想我找到了问题所在。我是 运行 YARN 上的 Spark (1.6.0-cdh5.7.0)。 Cloudera 有新的 Kafka 客户端(0.9 版本),它与早期版本相比有一个内部协议更改。但是,我们的Kafka是0.8.2版本的。
长话短说:
我非常简单的 Spark Streaming 应用程序在驱动程序中失败并显示 "KafkaException: String exceeds the maximum size"。我在执行程序中看到了相同的异常,但我还在执行程序日志的某处发现了一个 IllegalArgumentException,其中没有其他信息
全部问题:
我正在使用 Spark Streaming 从 Kafka 主题中读取一些消息。 这就是我正在做的事情:
val conf = new SparkConf().setAppName("testName")
val streamingContext = new StreamingContext(new SparkContext(conf), Milliseconds(millis))
val kafkaParams = Map(
"metadata.broker.list" -> "somevalidaddresshere:9092",
"auto.offset.reset" -> "largest"
)
val topics = Set("data")
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
streamingContext,
kafkaParams,
topics
).map(_._2) // only need the values not the keys
我对 Kafka 数据所做的只是使用以下方法打印它:
stream.print()
我的应用程序显然有比这更多的代码,但为了找到我的问题,我从代码中删除了所有可能的代码
我正在尝试 运行 YARN 上的此代码。 这是我的 spark 提交行:
./spark-submit --class com.somecompany.stream.MainStream --master yarn --deploy-mode cluster myjar.jar hdfs://some.hdfs.address.here/user/spark/streamconfig.properties
streamconfig.properties 文件只是一个常规属性文件,可能与这里的问题无关
在尝试执行应用程序后,它很快就失败了,驱动程序出现以下异常:
16/05/10 06:15:38 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, some.hdfs.address.here): kafka.common.KafkaException: String exceeds the maximum size of 32767.
at kafka.api.ApiUtils$.shortStringLength(ApiUtils.scala:73)
at kafka.api.TopicData$.headerSize(FetchResponse.scala:107)
at kafka.api.TopicData.<init>(FetchResponse.scala:113)
at kafka.api.TopicData$.readFrom(FetchResponse.scala:103)
at kafka.api.FetchResponse$$anonfun.apply(FetchResponse.scala:170)
at kafka.api.FetchResponse$$anonfun.apply(FetchResponse.scala:169)
at scala.collection.TraversableLike$$anonfun$flatMap.apply(TraversableLike.scala:251)
at scala.collection.TraversableLike$$anonfun$flatMap.apply(TraversableLike.scala:251)
at scala.collection.immutable.Range.foreach(Range.scala:141)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
at kafka.api.FetchResponse$.readFrom(FetchResponse.scala:169)
at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:135)
at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:192)
at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:208)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:308)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.rdd.RDD$$anonfun$take$$anonfun.apply(RDD.scala:1328)
at org.apache.spark.rdd.RDD$$anonfun$take$$anonfun.apply(RDD.scala:1328)
at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:1869)
at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:1869)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
我什至没有在堆栈跟踪中看到我的代码
检查执行程序,我发现了与驱动程序中相同的异常,但也深埋在以下异常中:
16/05/10 06:40:47 ERROR executor.Executor: Exception in task 0.0 in stage 2.0 (TID 8)
java.lang.IllegalArgumentException
at java.nio.Buffer.limit(Buffer.java:275)
at kafka.api.FetchResponsePartitionData$.readFrom(FetchResponse.scala:38)
at kafka.api.TopicData$$anonfun.apply(FetchResponse.scala:100)
at kafka.api.TopicData$$anonfun.apply(FetchResponse.scala:98)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:244)
at scala.collection.immutable.Range.foreach(Range.scala:141)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at kafka.api.TopicData$.readFrom(FetchResponse.scala:98)
at kafka.api.FetchResponse$$anonfun.apply(FetchResponse.scala:170)
at kafka.api.FetchResponse$$anonfun.apply(FetchResponse.scala:169)
at scala.collection.TraversableLike$$anonfun$flatMap.apply(TraversableLike.scala:251)
at scala.collection.TraversableLike$$anonfun$flatMap.apply(TraversableLike.scala:251)
at scala.collection.immutable.Range.foreach(Range.scala:141)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
at kafka.api.FetchResponse$.readFrom(FetchResponse.scala:169)
at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:135)
at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:192)
at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:208)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:308)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.rdd.RDD$$anonfun$take$$anonfun.apply(RDD.scala:1328)
at org.apache.spark.rdd.RDD$$anonfun$take$$anonfun.apply(RDD.scala:1328)
at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:1869)
at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:1869)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
我不知道 IllegalArgument 是什么,因为没有包含任何信息
我的 YARN 使用的 Spark 版本是 1.6.0。我还验证了我的 pom 包含 Spark 1.6.0 而不是早期版本。我的范围是 "provided"
我从完全相同的主题手动读取数据,那里的数据只是普通的 JSON。那里的数据根本不是很大。绝对小于 32767。而且我能够使用常规命令行使用者读取此数据,这很奇怪
谷歌搜索这个异常很遗憾没有提供任何有用的信息
有没有人知道如何理解这里的问题到底是什么?
提前致谢
经过大量挖掘,我想我找到了问题所在。我是 运行 YARN 上的 Spark (1.6.0-cdh5.7.0)。 Cloudera 有新的 Kafka 客户端(0.9 版本),它与早期版本相比有一个内部协议更改。但是,我们的Kafka是0.8.2版本的。