在 spark 提交作业中读取镶木地板文件时出现内存不足错误

Getting out of memory error while reading parquet file in spark submit job

[Stage 0:>                                                          (0 + 0) / 8]SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
[Stage 1:=====================================================>   (43 + 3) / 46]17/11/16 13:11:18 ERROR Executor: Exception in task 0.0 in stage 2.0 (TID 54)
java.lang.OutOfMemoryError: Java heap space
    at java.util.Arrays.copyOf(Arrays.java:3236)
    at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)
    at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
    at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
    at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
    at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:84)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:240)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
17/11/16 13:11:18 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker-4,5,main]
java.lang.OutOfMemoryError: Java heap space
    at java.util.Arrays.copyOf(Arrays.java:3236)
    at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)
    at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
    at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
    at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
    at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:84)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:240)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
17/11/16 13:11:18 WARN TaskSetManager: Lost task 0.0 in stage 2.0 (TID 54, localhost): java.lang.OutOfMemoryError: Java heap space
    at java.util.Arrays.copyOf(Arrays.java:3236)
    at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)
    at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
    at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
    at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
    at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:84)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:240)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

这是我的代码-

val sqlContext = new SQLContext(sc)
    //sqlContext.setConf("spark.sql.inMemoryColumnarStorage.compressed", "true")
    log.setLevel(Level.INFO)
    val config = HBaseConfiguration.create()
    val newDataDF = sqlContext.read.parquet(file)
    newDataDF.registerTempTable("newDataDF")
    //sqlContext.cacheTable("newDataDF")
    val result = sqlContext.sql("SELECT rec FROM newDataDF")
    val rows = result.map(t => t(0)).collect()
    //val rows = result.map(t => t.getAs[String]("rec"))

它在下一行抛出内存不足 //val rows = result.map(t => t(0)).collect()

已经尝试了内存调整和增加内存的所有选项executor/driver内存增加,但似乎没有任何效果。 任何建议将不胜感激。

您必须增加 spark.driver.memory,默认值为 1gb。您可以使用 --verbose 命令检查驱动程序和执行程序内存。有关详细信息,请查看 link 并根据您的要求设置内存。 https://spark.apache.org/docs/latest/configuration.html

好吧,通过在 DataFrame 上调用 collect,您告诉 Spark 将所有数据收集到驱动程序上。对于更大的数据集,这确实会淹没驱动程序并导致 OOM。

Spark 是一个分布式计算框架,旨在用于单机无法容纳的大型数据集。只有在极少数情况下,您才想在 DataFrame 上调用 collect,那是当您进行调试(在小数据集上)或者您 知道 由于一些过滤或聚合转换,数据集的大小已大大减少。