为什么 Spark Streaming 由于 java.lang.OutOfMemoryError 而在字符串解码时失败?
Why does Spark Streaming fail at String decoding due to java.lang.OutOfMemoryError?
我 运行 一个 Spark Streaming (createStream
API) 应用程序在 3 个节点的 YARN 集群上,每个节点有 128G RAM (!) 该应用程序从 Kafka 主题读取记录并写入 HDFS。
大多数时候应用程序 fails/is 由于 Java 堆错误而被杀死(主要是接收器失败),无论我配置多少内存 executor/driver。
16/11/23 13:00:20 WARN ReceiverTracker: Error reported by receiver for stream 0: Error handling message; exiting - java.lang.OutOfMemoryError: Java heap space
at java.lang.StringCoding$StringDecoder.decode(StringCoding.java:149)
at java.lang.StringCoding.decode(StringCoding.java:193)
at java.lang.String.<init>(String.java:426)
at java.lang.String.<init>(String.java:491)
at kafka.serializer.StringDecoder.fromBytes(Decoder.scala:50)
at kafka.serializer.StringDecoder.fromBytes(Decoder.scala:42)
at kafka.message.MessageAndMetadata.message(MessageAndMetadata.scala:32)
at org.apache.spark.streaming.kafka.KafkaReceiver$MessageHandler.run(KafkaInputDStream.scala:137)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
如果您正在使用 KafkaUtil.createStream(....) 单个接收器将在 spark 执行器中 运行 并且如果主题是分区的,每个接收器线程 运行划分。因此,如果您的流具有较大的字符串对象并且频率很高并且所有线程共享单个执行程序内存,您可能会遇到 OOM 问题。
以下是可能的解决方案。
由于接收器内存不足,作业失败,首先检查批处理和块间隔属性。如果批次间隔更小(例如 5 分钟),请尝试使用更小的值(例如(100 毫秒))。
限制每秒接收记录的速率为"spark.streaming.receiver.maxRate",同时确保
"spark.streaming.unpersist" 值为 "true".
- 您可以使用 KafkaUtil.KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](streamingContext, kafkaParams,
话题)。在这种情况下,而不是单个接收器火花执行器
直接连接kafka partition leads接收数据
并行(每个 kfka 分区是一个 KafkaRDD 分区)。不像
单个接收器执行器中的多个线程这里是多个
执行者将 运行 并行并分配负载。
我 运行 一个 Spark Streaming (createStream
API) 应用程序在 3 个节点的 YARN 集群上,每个节点有 128G RAM (!) 该应用程序从 Kafka 主题读取记录并写入 HDFS。
大多数时候应用程序 fails/is 由于 Java 堆错误而被杀死(主要是接收器失败),无论我配置多少内存 executor/driver。
16/11/23 13:00:20 WARN ReceiverTracker: Error reported by receiver for stream 0: Error handling message; exiting - java.lang.OutOfMemoryError: Java heap space
at java.lang.StringCoding$StringDecoder.decode(StringCoding.java:149)
at java.lang.StringCoding.decode(StringCoding.java:193)
at java.lang.String.<init>(String.java:426)
at java.lang.String.<init>(String.java:491)
at kafka.serializer.StringDecoder.fromBytes(Decoder.scala:50)
at kafka.serializer.StringDecoder.fromBytes(Decoder.scala:42)
at kafka.message.MessageAndMetadata.message(MessageAndMetadata.scala:32)
at org.apache.spark.streaming.kafka.KafkaReceiver$MessageHandler.run(KafkaInputDStream.scala:137)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
如果您正在使用 KafkaUtil.createStream(....) 单个接收器将在 spark 执行器中 运行 并且如果主题是分区的,每个接收器线程 运行划分。因此,如果您的流具有较大的字符串对象并且频率很高并且所有线程共享单个执行程序内存,您可能会遇到 OOM 问题。
以下是可能的解决方案。
由于接收器内存不足,作业失败,首先检查批处理和块间隔属性。如果批次间隔更小(例如 5 分钟),请尝试使用更小的值(例如(100 毫秒))。
限制每秒接收记录的速率为"spark.streaming.receiver.maxRate",同时确保 "spark.streaming.unpersist" 值为 "true".
- 您可以使用 KafkaUtil.KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](streamingContext, kafkaParams, 话题)。在这种情况下,而不是单个接收器火花执行器 直接连接kafka partition leads接收数据 并行(每个 kfka 分区是一个 KafkaRDD 分区)。不像 单个接收器执行器中的多个线程这里是多个 执行者将 运行 并行并分配负载。