Spark Graphframes 大型数据集和内存问题

Spark Graphframes large dataset and memory Issues

我想 运行 在相对较大的图 35 亿个节点 900 亿条边上进行 pagerank。我一直在尝试不同的簇大小,以使其达到 运行。但首先是代码:

from pyspark.sql import SparkSession
import graphframes

spark = SparkSession.builder.getOrCreate()

edges_DF = spark.read.parquet('s3://path/to/edges') # 1.4TB total size
verts_DF   = spark.read.parquet('s3://path/to/verts') # 25GB total size

graph_GDF = graphframes.GraphFrame(verts_DF, edges_DF)
graph_GDF = graph_GDF.dropIsolatedVertices()

result_df   = graph_GDF.pageRank(resetProbability=0.15, tol=0.1)
pagerank_df = result_df.vertices
pagerank_df.write.parquet('s3://path/to/output', mode='overwrite')

我从一开始就经历了很多垃圾收集问题。所以我为集群尝试了不同的设置和大小。主要关注了两篇文章:

https://databricks.com/blog/2015/05/28/tuning-java-garbage-collection-for-spark-applications.html

https://aws.amazon.com/blogs/big-data/best-practices-for-successfully-managing-memory-for-apache-spark-applications-on-amazon-emr/

我运行亚马逊EMR上的集群。这些是我目前使用的相关设置:

"spark.jars.packages": "org.apache.hadoop:hadoop-aws:2.7.6,graphframes:graphframes:0.7.0-spark2.4-s_2.11",
"spark.dynamicAllocation.enabled": "false",
"spark.network.timeout":"1600s",
"spark.executor.heartbeatInterval":"120s",
"spark.executor.extraJavaOptions": "-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError='kill -9 %p'",
"spark.driver.extraJavaOptions": "-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError='kill -9 %p'",
"spark.storage.level": "MEMORY_AND_DISK_SER",
"spark.rdd.compress": "true",
"spark.shuffle.compress": "true",
"spark.shuffle.spill.compress": "true",
"spark.memory.fraction": "0.80",
"spark.memory.storageFraction": "0.30",
"spark.serializer":"org.apache.spark.serializer.KryoSerializer",
"spark.sql.shuffle.partitions":"1216"

"yarn.nodemanager.vmem-check-enabled": "false",
"yarn.nodemanager.pmem-check-enabled": "false"

"maximizeResourceAllocation": "true"

"fs.s3.maxConnections": "5000",
"fs.s3.consistent": "true",
"fs.s3.consistent.throwExceptionOnInconsistency":"false",
"fs.s3.consistent.retryPolicyType":"fixed",
"fs.s3.consistent.retryPeriodSeconds":"10"

我尝试了簇大小,我的第一个似乎有效的实验是 具有以下参数的集群:--deploy-mode cluster --num-executors 75 --executor-cores 5 --executor-memory 36g --driver-memory 36g --driver-cores 5

使用此配置 GC 时间很短,一切正常,但由于它是一个测试集群,它有非常 "little" 的内存,总共 2.7 TB,也在一段时间后我得到了 ExecutorLostFailure (executor 54 exited caused by one of the running tasks) Reason: Container from a bad node Exit status: 137. 我以为是因为我把 node 留给了小内存。所以我重新 运行 整个事情,但这次 --executor-cores 5 --executor-memory 35g 马上我的 GC 问题,背部和我的集群表现得非常奇怪。所以我想我理解了 GC 次高的原因不是每个执行程序内存不足。

我启动的下一个集群具有以下参数:--deploy-mode cluster --num-executors 179 --executor-cores 5 --executor-memory 45g --driver-memory 45g --driver-cores 5

所以一个更大的集群和每个执行者的内存和以前一样。一切都 运行 顺利,我通过 ganglia 注意到第一步需要大约 5.5 TB 的 ram。

虽然我理解使用较少的集群可用内核并扩大每个执行程序的内存会使程序更快的问题,但我猜这与 verts_DF 大约 25gb 的大小和这样它将适合每个执行程序的内存并为计算留出空间(25GB * 179 几乎是 5.5TB)。

所以我启动的下一个集群具有相同数量的节点,但我将执行器的大小调整为:--num-executors 119 --executor-cores 5 --executor-memory 75g

瞬间所有的问题都回来了!通过 gangliaGC 次集群挂起 我可以看到 RAM 填充了 9 个可用 TB 中的 8 个。我很困惑。 我返回并再次启动 --num-executors 179 --executor-cores 5 --executor-memory 45g 集群,幸运的是,使用 EMR 很容易做到这一点,因为我可以克隆它。但现在这个配置也不起作用。高 GC 倍集群立即达到 8TB 已用内存。

这是怎么回事?感觉就像我在玩轮盘赌一样,有时相同的配置有效,有时却无效?

如果有人在一段时间后仍然偶然发现这个问题,它就会意识到问题出在如何 graphxgraphframes 加载图形上。两者都试图生成它们正在加载的图的所有三元组,其中非常大的图解决了 OOM 错误,因为具有 35 亿个节点和 700 亿条边的图已经破坏了其中的许多。 我通过在 pyspark 中实现 pagerank 编写了一个解决方案。它肯定不如 scala 实施那么快,但它可以扩展并且不会 运行 进入所描述的三元组问题。 我在 github 上发布了它 https://github.com/thagorx/spark_pagerank

如果您是运行单机版,带有pyspark和graphframes,您可以通过执行以下命令启动pysparkREPL

pyspark --driver-memory 2g --executor-memory 6g --packages graphframes:graphframes:0.7.0-spark2.4-s_2.11

请务必根据最新发布的 Spark 版本

适当更改 SPARK_VERSION 环境变量