SparkR org.apache.spark.SparkException: R worker 意外退出
SparkR org.apache.spark.SparkException: R worker exited unexpectedly
我正在尝试执行 SparkR gapply
,基本上当我尝试 运行 时,我的输入文件限制在大约 300k 行,但它可以工作,但是扩展到大约 1.2m 行我在许多执行程序任务的 stderr 中出现以下重复出现的异常 - 大约 70% 的任务完成,而其他任务失败或被杀死。失败的具有相同的错误输出:
org.apache.spark.SparkException: R worker exited unexpectedly (cranshed)
at org.apache.spark.api.r.RRunner.org$apache$spark$api$r$RRunner$$read(RRunner.scala:240)
at org.apache.spark.api.r.RRunner$$anon.next(RRunner.scala:91)
at scala.collection.Iterator$$anon.next(Iterator.scala:409)
at scala.collection.Iterator$$anon.next(Iterator.scala:409)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$$anon.hasNext(WholeStageCodegenExec.scala:395)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:408)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:346)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.EOFException
at java.io.DataInputStream.readInt(DataInputStream.java:392)
at org.apache.spark.api.r.RRunner.org$apache$spark$api$r$RRunner$$read(RRunner.scala:212)
... 16 more
除了分配更多内存外,还需要考虑哪些调整参数?我相信 SparkR 不像 PySpark 或 Scala 那样广泛使用,有时它们的调整参数可能会有所不同,因此我们将不胜感激。
这是在 Databricks/AWS 集群上 运行ning - 20 个工作节点,30.5 GB 内存,每个 4 个核心。
在我们的用例中,gapply
函数在最多 10 个行数据帧上运行,最多将 20 列拆分为 4 个 R 数据帧,然后使用 R 包将其输入线性优化求解器 NlcOptim,quadprog
.
使用 .cache() 并再次尝试解决了这个问题。
我正在尝试执行 SparkR gapply
,基本上当我尝试 运行 时,我的输入文件限制在大约 300k 行,但它可以工作,但是扩展到大约 1.2m 行我在许多执行程序任务的 stderr 中出现以下重复出现的异常 - 大约 70% 的任务完成,而其他任务失败或被杀死。失败的具有相同的错误输出:
org.apache.spark.SparkException: R worker exited unexpectedly (cranshed)
at org.apache.spark.api.r.RRunner.org$apache$spark$api$r$RRunner$$read(RRunner.scala:240)
at org.apache.spark.api.r.RRunner$$anon.next(RRunner.scala:91)
at scala.collection.Iterator$$anon.next(Iterator.scala:409)
at scala.collection.Iterator$$anon.next(Iterator.scala:409)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$$anon.hasNext(WholeStageCodegenExec.scala:395)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:408)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:346)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.EOFException
at java.io.DataInputStream.readInt(DataInputStream.java:392)
at org.apache.spark.api.r.RRunner.org$apache$spark$api$r$RRunner$$read(RRunner.scala:212)
... 16 more
除了分配更多内存外,还需要考虑哪些调整参数?我相信 SparkR 不像 PySpark 或 Scala 那样广泛使用,有时它们的调整参数可能会有所不同,因此我们将不胜感激。
这是在 Databricks/AWS 集群上 运行ning - 20 个工作节点,30.5 GB 内存,每个 4 个核心。
在我们的用例中,gapply
函数在最多 10 个行数据帧上运行,最多将 20 列拆分为 4 个 R 数据帧,然后使用 R 包将其输入线性优化求解器 NlcOptim,quadprog
.
使用 .cache() 并再次尝试解决了这个问题。