库加载未扩展到 Spark 中的函数

Library load not extending to functions in Spark

我有一个 SparkR 脚本,它加载了一些库,还定义了来自其他源脚本的函数。它的头部是这样的:

library(SparkR, lib.loc = c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib"))) 
library(devtools)   
library(JC.utilities)         # user-created

source("path/to/clean_data.R")

其中 clean_data.R 使用了 JM.utilities 库中的一些函数,例如:

clean_data <- function(df) {
  ... 
  return(JC.utilities::reformat(df))
}

在基础 R 中,这没有问题,因为通过采购定义此函数的 R 脚本应该意味着库和函数现在处于同一范围内。但是在 Spark 中,我在调用 clean_data():

时收到此错误消息
Error in loadNamespace(name) : there is no package called ‘JM.utilities’

两个问题:

  1. 为什么 Spark 似乎不记得我已经加载了这个库?
  2. 有没有办法解决这个问题而不必明确定义一堆东西?

您在驱动程序上加载了库,但没有在处理单独工作区的执行程序上加载库。

您需要在应用于数据框的函数中导入 JC.utilities。

如果执行器上根本没有安装该库,您将需要安装它,或者您可以尝试安装 this

我设计了一个最小的例子来说明我对库 stringr 及其函数的观点 str_length:

> d<-data.frame(a=c("a", "abcd", "bb"), b=c(1,2,3))
> df <- as.DataFrame(d)

让我们尝试 dapply 看看它是否适用于基本 R 代码:

> dapplyCollect(df, function(x) {cbind(x$a, x$b, x$b*x$b+1) })
    [,1]   [,2] [,3] 
[1,] "a"    "1"  "2"  
[2,] "abcd" "2"  "5"  
[3,] "bb"   "3"  "10" 

有效!然后让我们尝试计算第一列的字符数(我可以使用 nchar 但我的想法是演示如何使用库)。

> import(stringr)
> str_length("abcdef")
[1] 4

它在驱动程序中工作正常。让我们试试火花:

> dapplyCollect(df, function(x) {cbind(x$a, x$b, x$b*x$b+1, str_length(x$a)) })
17/11/08 18:55:17 ERROR executor.Executor: Exception in task 0.0 in stage 10.0 (TID 10)
org.apache.spark.SparkException: R computation failed with
 Error in cbind(x$a, x$b, x$b * x$b + 1, str_length(x$a)) : 
  could not find function "str_length"

正如我所解释的,在驱动程序中导入库不会在工作程序中导入它。但这就是我需要它的地方。现在让我们在 dapply 中导入库:

> dapplyCollect(df, function(x) {library(stringr); cbind(x$a, x$b, x$b*x$b+1, str_length(x$a)) })
     [,1]   [,2] [,3] [,4]
[1,] "a"    "1"  "2"  "1" 
[2,] "abcd" "2"  "5"  "4" 
[3,] "bb"   "3"  "10" "2"

好了。