Spark 作业超出 RAM (java.lang.OutOfMemoryError),即使有很多。 xmx 太低?

Spark job out of RAM (java.lang.OutOfMemoryError), even though there's plenty. xmx too low?

我的 Spark 作业正在 java.lang.OutOfMemoryError,尽管只有总内存的 20% 在使用中。

我试过几种配置:

我的数据集包含 180 万条记录,从主节点上的本地 json 文件读取。 json 格式的整个数据集为 7GB。我尝试执行的作业涉及一个简单的计算,然后是 reduceByKey。没有什么特别的。这项工作在我只有 32GB ram (xmx28g) 的单台家用计算机上运行良好,尽管它需要一些缓存到磁盘。

作业是通过 spark-submit 在服务器本地 (SSH) 提交的。

可以在此处查看堆栈跟踪和 Spark 配置:https://pastee.org/sgda

代码

val rdd = sc.parallelize(Json.load()) // load everything
  .map(fooTransform)                  // apply some trivial transformation
  .flatMap(_.bar.toSeq)               // flatten results
  .map(c => (c, 1))                   // count 
  .reduceByKey(_ + _)
  .sortBy(_._2)
log.v(rdd.collect.map(toString).mkString("\n"))

问题的根源在于您应该尝试将更多 I/O 卸载到分布式任务,而不是在驱动程序和辅助任务之间来回传送。虽然有时哪些调用是驱动程序本地调用以及哪些调用描述了分布式操作可能并不明显,但经验法则包括避免 parallelizecollect 除非您绝对需要将所有数据放在一个地方。您可以 Json.load()parallelize 的数据量将在任何可能的最大机器类型上最大化,而使用像 sc.textFile 这样的调用理论上可以毫无问题地扩展到数百 TB 甚至 PB。

您的情况的短期解决方法是尝试通过 spark-submit --conf spark.driver.memory=40g ... 或该范围内的值。 Dataproc 默认分配不到四分之一的机器给驱动程序内存,因为集群通常必须支持 运行 多个并发作业,并且还需要在主节点上为 HDFS namenode 和 YARN 资源管理器留出足够的内存。

从长远来看,您可能想要试验如何将 JSON 数据直接作为 RDD 加载,而不是将其加载到单个驱动程序中并使用 parallelize 来分发它,因为这您可以通过让任务并行加载数据来显着加快输入读取时间的方式(并消除警告 Stage 0 contains a task of very large size,这可能与将大数据从驱动程序传输到工作任务有关)。

类似地,不用 collect 然后在驱动程序上完成事情,你可以做 sc.saveAsTextFile 之类的事情以分布式方式保存,而不会在一个地方出现瓶颈。

将输入读取为 sc.textFile 将假定行分隔 JSON,您可以在某些 map 任务中进行解析,或者您可以尝试使用 sqlContext.read.json。出于调试目的,通常无需使用 collect() 即可调用 take(10) 来查看某些记录,而无需将所有记录发送给驱动程序。