内存不足异常或工作节点在 spark scala 作业期间丢失

Out of memory exception or worked node lost during the spark scala job

我正在使用 spark-shell 执行一个 spark-scala 作业,我面临的问题是,在最后阶段和最终映射器结束时,就像在第 5 阶段一样,它分配了 50 个并完成了 49 个非常很快,在第 50 次,它需要 5 分钟,并说内存不足并失败。我正在使用 SPARK_MAJOR_VERSION=2

我正在使用下面的命令 spark-shell --master yarn --conf spark.driver.memory=30G --conf spark.executor.memory=40G --conf spark.shuffle.service.enabled=true --conf spark.dynamicAllocation.enabled=false --conf spark.sql.broadcastTimeout=36000 --conf spark.shuffle.compress=true --conf spark.executor.heartbeatInterval=3600s --conf spark.executor.instance=160

在上面的配置中,我尝试了动态分配为真,并从 1GB 启动了驱动程序和执行程序内存。我有 6.78TB 的整体 ram 和 1300 VCores(这是我的整个 hadoop 硬件)。

我正在阅读的 table 是 40GB,我将 6 table 加入到 40GB table,因此,总体可能是 60GB。所以 spark 为此初始化了 4 个阶段,最后在最后阶段失败了。我正在使用 spark sql 来执行 SQL.

错误如下:

19/04/26 14:29:02 WARN HeartbeatReceiver: Removing executor 2 with no recent heartbeats: 125967 ms exceeds timeout 120000 ms
19/04/26 14:29:02 ERROR YarnScheduler: Lost executor 2 on worker03.some.com: Executor heartbeat timed out after 125967 ms
19/04/26 14:29:02 WARN TaskSetManager: Lost task 5.0 in stage 2.0 (TID 119, worker03.some.com, executor 2): ExecutorLostFailure (executor 2 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 125967 ms
19/04/26 14:29:02 WARN HeartbeatReceiver: Removing executor 1 with no recent heartbeats: 126225 ms exceeds timeout 120000 ms
19/04/26 14:29:02 ERROR YarnScheduler: Lost executor 1 on ncednhpwrka0008.devhadoop.charter.com: Executor heartbeat timed out after 126225 ms
19/04/26 14:29:02 WARN YarnSchedulerBackend$YarnSchedulerEndpoint: Container marked as failed: container_e1223_1556277056929_0976_01_000003 on host: worker03.some.com. Exit status: 52. Diagnostics: Exception from container-launch.
Container id: container_e1223_1556277056929_0976_01_000003
Exit code: 52
Shell output: main : command provided 1
main : run as user is svc-bd-xdladmrw-dev
main : requested yarn user is svc-bd-xdladmrw-dev
Getting exit code file...
Creating script paths...
Writing pid file...
Writing to tmp file /data/00/yarn/local/nmPrivate/application_1556277056929_0976/container_e1223_1556277056929_0976_01_000003/container_e1223_1556277056929_0976_01_000003.pid.tmp
Writing to cgroup task files...
Creating local dirs...
Launching container...
Getting exit code file...
Creating script paths...


Container exited with a non-zero exit code 52. Last 4096 bytes of stderr :
0 in stage 2.0 (TID 119)
19/04/26 14:27:37 INFO HadoopRDD: Input split: hdfs://datadev/data/dev/HIVE_SCHEMA/somedb.db/sbscr_usge_cycl_key_xref/000000_0_copy_2:0+6623042
19/04/26 14:27:37 INFO OrcRawRecordMerger: min key = null, max key = null
19/04/26 14:27:37 INFO ReaderImpl: Reading ORC rows from hdfs://datadev/data/dev/HIVE_SCHEMA/somedb.db/sbscr_usge_cycl_key_xref/000000_0_copy_2 with {include: [true, true, true], offset: 0, length: 9223372036854775807}
19/04/26 14:29:00 ERROR Executor: Exception in task 5.0 in stage 2.0 (TID 119)
java.lang.OutOfMemoryError
        at java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
        at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
        at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
        at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
        at net.jpountz.lz4.LZ4BlockOutputStream.flushBufferedData(LZ4BlockOutputStream.java:205)
        at net.jpountz.lz4.LZ4BlockOutputStream.write(LZ4BlockOutputStream.java:158)
        at java.io.DataOutputStream.write(DataOutputStream.java:107)
        at org.apache.spark.sql.catalyst.expressions.UnsafeRow.writeToStream(UnsafeRow.java:554)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun.apply(SparkPlan.scala:237)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun.apply(SparkPlan.scala:228)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$$anonfun$apply.apply(RDD.scala:827)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$$anonfun$apply.apply(RDD.scala:827)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:108)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
19/04/26 14:29:00 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker for task 119,5,main]
java.lang.OutOfMemoryError
        at java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
        at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
        at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
        at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
        at net.jpountz.lz4.LZ4BlockOutputStream.flushBufferedData(LZ4BlockOutputStream.java:205)
        at net.jpountz.lz4.LZ4BlockOutputStream.write(LZ4BlockOutputStream.java:158)
        at java.io.DataOutputStream.write(DataOutputStream.java:107)
        at org.apache.spark.sql.catalyst.expressions.UnsafeRow.writeToStream(UnsafeRow.java:554)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun.apply(SparkPlan.scala:237)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun.apply(SparkPlan.scala:228)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$$anonfun$apply.apply(RDD.scala:827)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$$anonfun$apply.apply(RDD.scala:827)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:108)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
19/04/26 14:29:00 INFO DiskBlockManager: Shutdown hook called
19/04/26 14:29:00 INFO ShutdownHookManager: Shutdown hook called
19/04/26 14:29:02 ERROR YarnScheduler: Lost executor 2 on worker03.some.com: Container marked as failed: container_e1223_1556277056929_0976_01_000003 on host: worker03.some.com. Exit status: 52. Diagnostics: Exception from container-launch.
Container id: container_e1223_1556277056929_0976_01_000003
Exit code: 52
Shell output: main : command provided 1
main : run as user is svc-bd-xdladmrw-dev
main : requested yarn user is svc-bd-xdladmrw-dev
Getting exit code file...
Creating script paths...
Writing pid file...
Writing to tmp file /data/00/yarn/local/nmPrivate/application_1556277056929_0976/container_e1223_1556277056929_0976_01_000003/container_e1223_1556277056929_0976_01_000003.pid.tmp
Writing to cgroup task files...
Creating local dirs...
Launching container...
Getting exit code file...
Creating script paths...


Container exited with a non-zero exit code 52. Last 4096 bytes of stderr :
0 in stage 2.0 (TID 119)
19/04/26 14:27:37 INFO HadoopRDD: Input split: hdfs://datadev/data/dev/HIVE_SCHEMA/somedb.db/sbscr_usge_cycl_key_xref/000000_0_copy_2:0+6623042
19/04/26 14:27:37 INFO OrcRawRecordMerger: min key = null, max key = null
19/04/26 14:27:37 INFO ReaderImpl: Reading ORC rows from hdfs://datadev/data/dev/HIVE_SCHEMA/somedb.db/sbscr_usge_cycl_key_xref/000000_0_copy_2 with {include: [true, true, true], offset: 0, length: 9223372036854775807}
19/04/26 14:29:00 ERROR Executor: Exception in task 5.0 in stage 2.0 (TID 119)
java.lang.OutOfMemoryError
        at java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
        at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
        at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
        at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
        at net.jpountz.lz4.LZ4BlockOutputStream.flushBufferedData(LZ4BlockOutputStream.java:205)
        at net.jpountz.lz4.LZ4BlockOutputStream.write(LZ4BlockOutputStream.java:158)
        at java.io.DataOutputStream.write(DataOutputStream.java:107)
        at org.apache.spark.sql.catalyst.expressions.UnsafeRow.writeToStream(UnsafeRow.java:554)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun.apply(SparkPlan.scala:237)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun.apply(SparkPlan.scala:228)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$$anonfun$apply.apply(RDD.scala:827)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$$anonfun$apply.apply(RDD.scala:827)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:108)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
19/04/26 14:29:00 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker for task 119,5,main]
java.lang.OutOfMemoryError
        at java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
        at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
        at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
        at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
        at net.jpountz.lz4.LZ4BlockOutputStream.flushBufferedData(LZ4BlockOutputStream.java:205)
        at net.jpountz.lz4.LZ4BlockOutputStream.write(LZ4BlockOutputStream.java:158)
        at java.io.DataOutputStream.write(DataOutputStream.java:107)
        at org.apache.spark.sql.catalyst.expressions.UnsafeRow.writeToStream(UnsafeRow.java:554)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun.apply(SparkPlan.scala:237)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun.apply(SparkPlan.scala:228)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$$anonfun$apply.apply(RDD.scala:827)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$$anonfun$apply.apply(RDD.scala:827)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:108)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
19/04/26 14:29:00 INFO DiskBlockManager: Shutdown hook called
19/04/26 14:29:00 INFO ShutdownHookManager: Shutdown hook called

如果我在这里做错了什么,比如内存分配之类的,谁能告诉我?请提出任何替代方案来完成这项工作,而不会出现我们的内存异常或工作节点丢失错误。非常感谢任何帮助或信息。

谢谢!

at the end of the final stage and final mapper like in stage 5 it allocates 50 and completed 49 very quickly and at the 50th it takes 5 minutes and says that out of memory and fails.

The table I am reading is 40GB and I am joining 6 tables to that 40GB table

对我来说这听起来像是一个倾斜的数据,用于连接的大多数键都在一个分区中。因此,Spark 只使用一个并重载它,而不是在多个执行器之间分散工作。它会影响内存消耗和性能。 有几种处理方法:

Skewed dataset join in Spark?