Graphframes PageRank 性能:PySpark 与 sparklyr

Graphframes PageRank performance: PySpark vs sparklyr

我正在使用来自 Python 和 R 的 Spark/GraphFrames。当我在来自 Python 的小图上调用 PageRank 时,它比使用 R 慢很多。 考虑到 Python 和 R 都在调用相同的库,为什么 Python 这么慢?

我将在下面尝试演示问题。

Spark/GraphFrames 包含图表示例,例如 friends,如 this link 中所述。这是一个非常小的有向图,有6个节点和8条边(注意例子和其他版本的GraphFrames不一样)。

当我用 R 运行 下面的一段代码时,计算 PageRank 几乎不需要时间:

library(graphframes)
library(sparklyr)
library(dplyr)

nodes <- read.csv('nodes.csv')
edges <- read.csv('edges.csv')

sc <- spark_connect(master = "local", version = "2.1.1")

nodes_tbl <- copy_to(sc, nodes)
edges_tbl <- copy_to(sc, edges)

graph <- gf_graphframe(nodes_tbl, edges_tbl)
ranks <- gf_pagerank(graph, reset_probability = 0.15, tol = 0.01)
print(ranks$vertices)

results <- as.data.frame(ranks$vertices)
results <- arrange(results, id)
results$pagerank <- results$pagerank / sum(results$pagerank)

print(results)

当我运行相当于用PySpark时,需要10到30分钟:

from pyspark.sql import SparkSession
from graphframes.examples import Graphs

if __name__ == '__main__':

    sc = SparkSession.builder.master("local").getOrCreate()
    g = Graphs(sc).friends()
    results = g.pageRank(resetProbability=0.15, tol=0.01)
    results.vertices.select("id", "pagerank").show()
    results.edges.select("src", "dst", "weight").show()

我尝试了不同版本的 Spark 和 GraphFrames Python 以与 R 的设置保持一致。

一般来说,当您看到在不同后端显然等效的代码片段之间存在如此显着的运行时差异时,您必须考虑两种可能性:

  • 没有真正等效的。尽管底层使用了相同的 Java 库,但不同语言与 JVM 交互的路径并不相同,并且当代码到达 JVM 时,它可能不会使用相同的调用链。
  • 方法相同,但配置和/或数据分布不同。

在这种特殊情况下,第一个也是最明显的原因是您加载数据的方式。

  • sparklyrcopy_to.spark_connectionuses by default only a single partition. With such small data , as parallelization / distribution overhead can be much higher than the computation cost, but

  • 在 PySpark 中,friends 加载程序使用标准 parallelize - 这意味着分区数将使用 defaultParallelism.

    根据主配置,该值至少为 1,但它可能会受到此处不可见的配置选项的影响(如 spark.default.parallelism)。

但是,据我所知,在这种特殊情况下,这些选项应该不会影响运行时。此外,在这两种情况下,代码到达 JVM 后端之前的路径差异似乎不足以解释差异。

这表明问题出在配置的某处。一般来说,至少有两个选项会显着影响数据分布,从而影响执行时间:

  • spark.default.parallelism - 与 RDD API 一起使用以确定不同情况下的分区数,包括默认的 post-shuffle 分布。有关可能的影响,请参见

    它看起来不会影响你这里的代码。

  • spark.sql.shuffle.partitions - 与 Dataset API 一起使用以确定洗牌后的分区数(groupByjoin、 ETC。)。

    虽然 PageRank 代码使用旧的 GraphX API,并且此参数不能直接应用于那里,但在数据传递给旧的 API、involves indexing edges and verticesDataset 之前API.

    如果您查看源代码,您会发现 indexedEdges and indexVertices 都使用连接,因此依赖于 spark.sql.shuffle.partitions

    此外,上述方法设置的分区数将被GraphX Graph对象继承,显着影响执行时间。

    如果将 spark.sql.shuffle.partitions 设置为最小值:

    spark: SparkSession
    spark.conf.set("spark.sql.shuffle.partitions", 1)
    

    如此小的数据的执行时间应该可以忽略不计。

结论:

您的环境可能会使用不同的 spark.sql.shuffle.partitions 值。

一般路线:

如果您看到这样的行为,并且想粗略地缩小问题范围,您应该查看 Spark UI,看看哪里有分歧。在这种情况下,您可能会看到明显不同的任务数。