Spark GraphFrames High Shuffle read/write
Spark GraphFrames High Shuffle read/write
您好,我已经使用顶点和边文件创建了图形。图的大小为 600GB。我正在使用 Spark GraphFrames 的主题特征查询此图。
我已经设置了一个 AWS EMR 集群来查询图形。
集群详细信息:- 1 master 和 8 slaves
主节点:
m5.xlarge
4 vCore, 16 GiB memory, EBS only storage
EBS Storage:64 GiB
从节点:
m5.4xlarge
16 vCore, 64 GiB memory, EBS only storage
EBS Storage:256 GiB (per instance)
我面临非常高的随机读取 (3.4TB) 和写入 (2TB),这会影响性能,执行 10 queries.Is 大约需要 50 分钟,有任何方法可以减少这种高随机播放。
以下是我的火花代码:-
val spark = SparkSession.builder.appName("SparkGraph POC").getOrCreate()
val g:GraphFrame = GraphFrame(vertexDf, edgeDf)
//queries
val q1 = g.find(" (a)-[r1]->(b); (b)-[r2]->(c)")
q1.filter(
" r1.relationship = 'knows' and" +
" r2.relationship = 'knows'").distinct()
.createOrReplaceTempView("q1table")
spark.sql("select a.id as a_id,a.name as a_name," +
"b.id as b_id,b.name as b_name," +
"c.id as c_id,c.name as c_name from q1table")
.write
.option("quote", "\"")
.option("escape", "\"")
.option("header","true")
.csv(resFilePath + "/q1")
spark.catalog.uncacheTable("q1table")
val q2 = g.find(" (a)-[r1]->(b); (b)-[r2]->(c); (c)-[r3]->(d); (d)-[r4]->(e)")
q2.filter(
" a.name = 'user1' and" +
" e.name = 'user4' and" +
" r1.relationship = 'knows' and" +
" r2.relationship = 'knows' and" +
" r3.relationship = 'knows' and" +
" r4.relationship = 'knows'").distinct()
.createOrReplaceTempView("q2table")
spark.sql("select a.id as a_id, a.name as a_name ," +
"e.id as e_id, e.name as e_name from q2table")
.write
.option("quote", "\"")
.option("escape", "\"")
.option("header","true")
.csv(resFilePath + "/q2")
spark.catalog.uncacheTable("q2table")
spark.stop()
Graphframes 的实现问题在于它使内部数据帧的自连接次数与您在图案上使用的次数一样多。这意味着随着链条长度的增加,你会有更多的洗牌
您可以在 https://www.waitingforcode.com/apache-spark-graphframes/motifs-finding-graphframes/read
查看更多详细信息
我也尝试过类似的方法,发现当链的长度大于 12 时,即使我增加了资源,Spark 也开始没有响应并且与执行程序的连接丢失。
如果您正在尝试这样做,我建议您改用图数据库。
希望对您有所帮助
您好,我已经使用顶点和边文件创建了图形。图的大小为 600GB。我正在使用 Spark GraphFrames 的主题特征查询此图。 我已经设置了一个 AWS EMR 集群来查询图形。
集群详细信息:- 1 master 和 8 slaves
主节点:
m5.xlarge
4 vCore, 16 GiB memory, EBS only storage
EBS Storage:64 GiB
从节点:
m5.4xlarge
16 vCore, 64 GiB memory, EBS only storage
EBS Storage:256 GiB (per instance)
我面临非常高的随机读取 (3.4TB) 和写入 (2TB),这会影响性能,执行 10 queries.Is 大约需要 50 分钟,有任何方法可以减少这种高随机播放。
以下是我的火花代码:-
val spark = SparkSession.builder.appName("SparkGraph POC").getOrCreate()
val g:GraphFrame = GraphFrame(vertexDf, edgeDf)
//queries
val q1 = g.find(" (a)-[r1]->(b); (b)-[r2]->(c)")
q1.filter(
" r1.relationship = 'knows' and" +
" r2.relationship = 'knows'").distinct()
.createOrReplaceTempView("q1table")
spark.sql("select a.id as a_id,a.name as a_name," +
"b.id as b_id,b.name as b_name," +
"c.id as c_id,c.name as c_name from q1table")
.write
.option("quote", "\"")
.option("escape", "\"")
.option("header","true")
.csv(resFilePath + "/q1")
spark.catalog.uncacheTable("q1table")
val q2 = g.find(" (a)-[r1]->(b); (b)-[r2]->(c); (c)-[r3]->(d); (d)-[r4]->(e)")
q2.filter(
" a.name = 'user1' and" +
" e.name = 'user4' and" +
" r1.relationship = 'knows' and" +
" r2.relationship = 'knows' and" +
" r3.relationship = 'knows' and" +
" r4.relationship = 'knows'").distinct()
.createOrReplaceTempView("q2table")
spark.sql("select a.id as a_id, a.name as a_name ," +
"e.id as e_id, e.name as e_name from q2table")
.write
.option("quote", "\"")
.option("escape", "\"")
.option("header","true")
.csv(resFilePath + "/q2")
spark.catalog.uncacheTable("q2table")
spark.stop()
Graphframes 的实现问题在于它使内部数据帧的自连接次数与您在图案上使用的次数一样多。这意味着随着链条长度的增加,你会有更多的洗牌
您可以在 https://www.waitingforcode.com/apache-spark-graphframes/motifs-finding-graphframes/read
查看更多详细信息我也尝试过类似的方法,发现当链的长度大于 12 时,即使我增加了资源,Spark 也开始没有响应并且与执行程序的连接丢失。
如果您正在尝试这样做,我建议您改用图数据库。
希望对您有所帮助