sparklyr spark_apply 很慢

sparklyr spark_apply is very slow

sparklyr spark_apply 非常慢/根本没有响应。签入 spark UI 时,正在执行的阶段在 utils.scala:204 收集。它正在执行 0/1(1 运行) 个任务。应用 spark_apply 的数据框有 30 个分区。此任务没有任何进展以及为什么正在执行单个任务

library(sparklyr)
library(dplyr)
config=spark_config()
config=c(config, list("spark.files"="hdfs:///bundle/packages.tar","spark.dynamicAllocation.enabled"="false","spark.executor.memory"="10g","spark.executor.cores"="4","spark.executor.instances"="7"))
sc <- spark_connect(master="yarn", app_name = "demo",config = config,version="2.3.0")
demo_data <- spark_read_csv(sc,name='demo_data',path = '/data.txt',delimiter = '\t',infer_schema = FALSE, columns = list(column1 = "integer"))
spark_apply(demo_data, function(df) df * 10, packages = "packages.tar" ,columns=list(column1="integer"))

我的技巧是 运行 一个最小的 spark_apply 函数(来自 rstudio.com)在集群初始化之后:spark_apply(function(e) I(e)) 用于在每个节点上初始化 R 环境从头开始。

在书里"mastering spark with R" javier luraschi recommands using spark_apply as a last resort, even if the following explainations,表明已投入大量资金克服瓶颈,特别是箭头库的开发。

也许在这一点上他应该提到使用 sparklyr 运行 并行任务的更合适的方法(参见 Spark and Sparklyr)。

他在 github 上解释说 spark_apply 遇到了序列化问题,正如 here and here

所解释的

另一方面,randomgambit 认为性能问题是由于 Sparklyr 行为将整个 R 分布复制到每个节点。

就我而言,问题不在于本地模式, 但是在集群模式下第一次执行 spark_apply 期间:

使用来自rstudio.com的测试命令 基准测试为 运行 这个表达式

提供了以下性能
sdf_len(sc, 5, repartition = 1) %>% spark_apply(function(e) I(e)) 
  • 本地模式:

    Unit: seconds min lq mean median uq max neval 5.043947 5.043947 5.043947 5.043947 5.043947 5.043947 1

  • 集群模式(一主一工):

    • 第一次执行

    Unit: seconds min lq mean median uq max neval 928.0637 928.0637 928.0637 928.0637 928.0637 928.0637 1

    • 第二次执行

    Unit: seconds min lq mean median uq max neval 4.309775 4.309775 4.309775 4.309775 4.309775 4.309775 1

鉴于第二次执行速度要快得多,我认为Sparklyr 在每个节点上复制整个R 分布需要923 秒= 15 分钟,23 秒

这里是使用的代码:

library(dplyr) 
library(sparklyr) 
library(microbenchmark)
sc <- spark_connect(master = "local")

microbenchmark(
  sdf_len(sc, 5, repartition = 1) %>% spark_apply(function(e) I(e))
  ,times = 1L)    

conf <- spark_config()
conf[["spark.r.command"]] <- "d:/path_to/R-3.6.1/bin/Rscript.exe"
sc <- spark_connect(master="spark://192.168.0.12:7077", 
                                        version = "2.4.3",
                                        spark_home = "C:\Users\username\AppData\Local\spark\spark-2.4.3-bin-hadoop2.7",
                                        config = conf)
microbenchmark(
  sdf_len(sc, 5, repartition = 1) %>% spark_apply(function(e) I(e))
  ,times = 1L)
microbenchmark(
  sdf_len(sc, 5, repartition = 1) %>% spark_apply(function(e) I(e))
  ,times = 1L)