"GC overhead limit exceeded" 将大型数据集缓存到 spark 内存中(通过 sparklyr 和 RStudio)

"GC overhead limit exceeded" on cache of large dataset into spark memory (via sparklyr & RStudio)

我对尝试使用的大数据技术还很陌生,但到目前为止,我已经设法在 RStudio 中设置 sparklyr 以连接到独立的 Spark 集群。数据存储在 Cassandra 中,我可以成功地将大型数据集放入 Spark 内存(缓存)以 运行 进一步分析。

然而,最近我在将一个特别大的数据集导入 Spark 内存时遇到了很多麻烦,即使集群应该有足够多的资源(60 个内核,200GB RAM)来处理其大小的数据集.

我认为通过将缓存的数据限制为仅几个 select 感兴趣的列,我可以解决这个问题(使用我之前查询的答案代码 ),但它确实如此不是。发生的事情是我本地机器上的 jar 进程逐渐占用所有本地 RAM 和 CPU 资源,整个进程冻结,并且在集群上执行器不断被删除和重新添加。奇怪的是,即使我 select 只有 1 行用于缓存(这应该使这个数据集比我在 Spark 内存中缓存没有问题的其他数据集小得多)也会发生这种情况。

我查看了日志,这些似乎是该过程早期唯一的信息errors/warnings:

17/03/06 11:40:27 ERROR TaskSchedulerImpl: Ignoring update with state FINISHED for TID 33813 because its task set is gone (this is likely the result of receiving duplicate task finished status updates) or its executor has been marked as failed.
17/03/06 11:40:27 INFO DAGScheduler: Resubmitted ShuffleMapTask(0, 8167), so marking it as still running
...
17/03/06 11:46:59 WARN TaskSetManager: Lost task 3927.3 in stage 0.0 (TID 54882, 213.248.241.186, executor 100): ExecutorLostFailure (executor 100 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 167626 ms
17/03/06 11:46:59 INFO DAGScheduler: Resubmitted ShuffleMapTask(0, 3863), so marking it as still running
17/03/06 11:46:59 WARN TaskSetManager: Lost task 4300.3 in stage 0.0 (TID 54667, 213.248.241.186, executor 100): ExecutorLostFailure (executor 100 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 167626 ms
17/03/06 11:46:59 INFO DAGScheduler: Resubmitted ShuffleMapTask(0, 14069), so marking it as still running

然后在大约 20 分钟后,整个作业崩溃了:

java.lang.OutOfMemoryError: GC overhead limit exceeded

我更改了我的连接配置以增加心跳间隔(spark.executor.heartbeatInterval: '180s'),并且了解了如何通过更改 yarn 集群上的设置(使用 spark.yarn.executor.memoryOverhead)来增加 memoryOverhead,但没有在一个独立的集群上。

在我的配置文件中,我通过一次添加以下每个设置进行了试验(none 其中有效):

spark.memory.fraction: 0.3
spark.executor.extraJavaOptions: '-Xmx24g'
spark.driver.memory: "64G"
spark.driver.extraJavaOptions: '-XX:MaxHeapSize=1024m'
spark.driver.extraJavaOptions: '-XX:+UseG1GC'

更新:我当前的完整 yml 配置文件如下:

default:
# local settings
  sparklyr.sanitize.column.names: TRUE
  sparklyr.cores.local: 3
  sparklyr.shell.driver-memory: "8G"

# remote core/memory settings
  spark.executor.memory: "32G"
  spark.executor.cores: 5
  spark.executor.heartbeatInterval: '180s'
  spark.ext.h2o.nthreads: 10
  spark.cores.max: 30
  spark.memory.storageFraction: 0.6
  spark.memory.fraction: 0.3
  spark.network.timeout: 300
  spark.driver.extraJavaOptions: '-XX:+UseG1GC'

# other configs for spark
  spark.serializer: org.apache.spark.serializer.KryoSerializer
  spark.executor.extraClassPath: /var/lib/cassandra/jar/guava-18.0.jar

# cassandra settings
  spark.cassandra.connection.host: <cassandra_ip>
  spark.cassandra.auth.username: <cassandra_login>
  spark.cassandra.auth.password: <cassandra_pass>
  spark.cassandra.connection.keep_alive_ms: 60000

# spark packages to load
  sparklyr.defaultPackages: 
  - "com.datastax.spark:spark-cassandra-connector_2.11:2.0.0-M1"
  - "com.databricks:spark-csv_2.11:1.3.0"
  - "com.datastax.cassandra:cassandra-driver-core:3.0.2"
  - "com.amazonaws:aws-java-sdk-pom:1.10.34"

所以我的问题是:

  1. 有人知道在这种情况下该怎么做吗?
  2. 是否有我可以更改的配置设置来帮助解决这个问题?
  3. 或者,有没有办法将cassandra数据导入到 以 RStudio/sparklyr 作为驱动程序的批次?
  4. 或者,有没有一种方法可以 munge/filter/edit 将数据放入缓存中,从而使结果 table 更小(类似于使用 SQL 查询,但使用更复杂的 dplyr 语法)?

好的,我终于成功了!

我最初尝试过@user6910411 的建议来减小 cassandra 输入拆分大小,但同样失败了。在玩了很多其他东西之后,今天我尝试朝相反的方向更改该设置:

spark.cassandra.input.split.size_in_mb: 254 

通过增加拆分大小,spark 任务更少,因此开销和对 GC 的调用更少。成功了!