Sparklyr 的 spark_apply 函数似乎在单个执行程序上 运行 并且在中等大型数据集上失败

Sparklyr's spark_apply function seems to run on single executor and fails on moderately-large dataset

我正在尝试在 Spark table 上使用 spark_apply 到 运行 下面的 R 函数。如果我的输入 table 很小(例如 5,000 行),这可以正常工作,但是当 table 中等大(例如 5,000,000 行)时,大约 30 分钟后会抛出错误: sparklyr worker rscript failure, check worker logs for details

查看 Spark UI 表明只有一个任务被创建,并且一个执行器被应用到这个任务。

任何人都可以就为什么这个函数对于 500 万行数据集失败提出建议吗?问题可能是让一个执行者完成所有工作,但失败了?

# Create data and copy to Spark
testdf <- data.frame(string_id=rep(letters[1:5], times=1000), # 5000 row table
                 string_categories=rep(c("", "1", "2 3", "4 5 6", "7"), times=1000))
testtbl <- sdf_copy_to(sc, testdf, overwrite=TRUE, repartition=100L, memory=TRUE)

# Write function to return dataframe with strings split out
myFunction <- function(inputdf){
  inputdf$string_categories <- as.character(inputdf$string_categories)
  inputdf$string_categories=with(inputdf, ifelse(string_categories=="", "blank", string_categories))
  stringCategoriesList <- strsplit(inputdf$string_categories, ' ')
  outDF <- data.frame(string_id=rep(inputdf$string_id, times=unlist(lapply(stringCategoriesList, length))),
                  string_categories=unlist(stringCategoriesList))
 return(outDF)
}

# Use spark_apply to run function in Spark
outtbl <- testtbl %>%
  spark_apply(myFunction,
          names=c('string_id', 'string_categories'))
outtbl
  1. sparklyr worker rscript failure, check worker logs for details错误是driver节点写的,指出实际错误需要在worker日志中查找。通常,可以通过在 Spark UI 中的执行程序选项卡中打开 stdout 来访问工作日志;日志应包含 RScript: 个条目,描述执行程序正在处理的内容以及错误的具体情况。

  2. 关于正在创建的单个任务,当columns没有指定类型在spark_apply()时,它需要计算结果的一个子集来猜测列类型,为避免这种情况,按如下方式传递显式列类型:

    outtbl <- testtbl %>% spark_apply( myFunction, columns=list( string_id = "character", string_categories = "character"))

  3. 如果使用 sparklyr 0.6.3,请更新为 sparklyr 0.6.4devtools::install_github("rstudio/sparklyr"),因为 sparklyr 0.6.3 在某些包分发的边缘情况下包含不正确的等待时间已启用且每个节点中有多个执行程序运行s。

  4. 高负载下,运行内存不足是常事。增加分区数可以解决此问题,因为它会减少处理此数据集所需的总内存。尝试 运行将其设置为:

    testtbl %>% sdf_repartition(1000) %>% spark_apply(myFunction, names=c('string_id', 'string_categories'))

  5. 也有可能是因为函数中的逻辑,函数对某些分区抛出了异常,你可以通过tryCatch()来查看是否是这种情况忽略错误,然后找出哪些是缺失值,以及为什么函数会因这些值而失败。我将从以下内容开始:

    myFunction <- function(inputdf){ tryCatch({ inputdf$string_categories <- as.character(inputdf$string_categories) inputdf$string_categories=with(inputdf, ifelse(string_categories=="", "blank", string_categories)) stringCategoriesList <- strsplit(inputdf$string_categories, ' ') outDF <- data.frame(string_id=rep(inputdf$string_id, times=unlist(lapply(stringCategoriesList, length))), string_categories=unlist(stringCategoriesList)) return(outDF) }, error = function(e) { return( data.frame(string_id = c(0), string_categories = c("error")) ) }) }