如何使用 mclapply 重用 sparklyr 上下文?

How to reuse sparklyr context with mclapply?

我有一个 R 代码,它在 sparklyr 中做一些分布式数据预处理,然后将数据收集到 R 本地数据帧,最后将结果保存在 CSV 中。一切都按预期工作,现在我计划在多个输入文件处理中重新使用 spark 上下文。

我的代码看起来类似于这个可重现的示例:

library(dplyr)
library(sparklyr)

sc <- spark_connect(master = "local")

# Generate random input
matrix(rbinom(1000, 1, .5), ncol=1) %>% write.csv('/tmp/input/df0.csv')
matrix(rbinom(1000, 1, .5), ncol=1) %>% write.csv('/tmp/input/df1.csv')

# Multi-job input
input = list(
    list(name="df0", path="/tmp/input/df0.csv"),
    list(name="df1", path="/tmp/input/df1.csv")
)
global_parallelism = 2
results_dir = "/tmp/results2"

# Function executed on each file
f <- function (job) {
    spark_df <- spark_read_csv(sc, "df_tbl", job$path)
    local_df <- spark_df %>% 
      group_by(V1) %>%           
      summarise(n=n()) %>%
      sdf_collect

    output_path <- paste(results_dir, "/", job$name, ".csv", sep="")
    local_df %>% write.csv(output_path)
    return (output_path)
}

如果我使用 lapply 按顺序执行作业输入的功能,一切都按预期工作:

> lapply(input, f)

[[1]]
[1] "/tmp/results2/df0.csv"

[[2]]
[1] "/tmp/results2/df1.csv"

但是,如果我计划并行 运行 它以最大限度地利用 spark 上下文(如果 df0 spark 处理完成并且本地 R 正在处理它,df1可以已经被 spark 处理):

> library(parallel)
> library(MASS)
> mclapply(input, f, mc.cores = global_parallelism)

 *** caught segfault ***
address 0x560b2c134003, cause 'memory not mapped'
[[1]]
[1] "Error in as.vector(x, \"list\") : \n  cannot coerce type 'environment' to vector of type 'list'\n"
attr(,"class")
[1] "try-error"
attr(,"condition")
<simpleError in as.vector(x, "list"): cannot coerce type 'environment' to vector of type 'list'>

[[2]]
NULL

Warning messages:
1: In mclapply(input, f, mc.cores = global_parallelism) :
  scheduled core 2 did not deliver a result, all values of the job will be affected
2: In mclapply(input, f, mc.cores = global_parallelism) :
  scheduled core 1 encountered error in user code, all values of the job will be affected

当我对 Python 和 ThreadPoolExcutor 执行类似操作时,spark 上下文在线程之间共享,对于 Scala 和 Java 也是如此。

是否可以在 R 中的并行执行中重用 sparklyr 上下文?

是的,不幸的是,class spark_connectionsc 对象无法导出到另一个 R 进程(即使使用分叉处理)。如果你使用 future.apply package, part of the future 生态系统,你可以看到这个如果你使用:

library(future.apply)
plan(multicore)

## Look for non-exportable objects and given an error if found
options(future.globals.onReference = "error")

y <- future_lapply(input, f)

那会抛出:

Error: Detected a non-exportable reference (‘externalptr’) in one of the
globals (‘sc’ of class ‘spark_connection’) used in the future expression