如何使用 mclapply 重用 sparklyr 上下文?
How to reuse sparklyr context with mclapply?
我有一个 R 代码,它在 sparklyr
中做一些分布式数据预处理,然后将数据收集到 R 本地数据帧,最后将结果保存在 CSV 中。一切都按预期工作,现在我计划在多个输入文件处理中重新使用 spark 上下文。
我的代码看起来类似于这个可重现的示例:
library(dplyr)
library(sparklyr)
sc <- spark_connect(master = "local")
# Generate random input
matrix(rbinom(1000, 1, .5), ncol=1) %>% write.csv('/tmp/input/df0.csv')
matrix(rbinom(1000, 1, .5), ncol=1) %>% write.csv('/tmp/input/df1.csv')
# Multi-job input
input = list(
list(name="df0", path="/tmp/input/df0.csv"),
list(name="df1", path="/tmp/input/df1.csv")
)
global_parallelism = 2
results_dir = "/tmp/results2"
# Function executed on each file
f <- function (job) {
spark_df <- spark_read_csv(sc, "df_tbl", job$path)
local_df <- spark_df %>%
group_by(V1) %>%
summarise(n=n()) %>%
sdf_collect
output_path <- paste(results_dir, "/", job$name, ".csv", sep="")
local_df %>% write.csv(output_path)
return (output_path)
}
如果我使用 lapply
按顺序执行作业输入的功能,一切都按预期工作:
> lapply(input, f)
[[1]]
[1] "/tmp/results2/df0.csv"
[[2]]
[1] "/tmp/results2/df1.csv"
但是,如果我计划并行 运行 它以最大限度地利用 spark 上下文(如果 df0
spark 处理完成并且本地 R 正在处理它,df1
可以已经被 spark 处理):
> library(parallel)
> library(MASS)
> mclapply(input, f, mc.cores = global_parallelism)
*** caught segfault ***
address 0x560b2c134003, cause 'memory not mapped'
[[1]]
[1] "Error in as.vector(x, \"list\") : \n cannot coerce type 'environment' to vector of type 'list'\n"
attr(,"class")
[1] "try-error"
attr(,"condition")
<simpleError in as.vector(x, "list"): cannot coerce type 'environment' to vector of type 'list'>
[[2]]
NULL
Warning messages:
1: In mclapply(input, f, mc.cores = global_parallelism) :
scheduled core 2 did not deliver a result, all values of the job will be affected
2: In mclapply(input, f, mc.cores = global_parallelism) :
scheduled core 1 encountered error in user code, all values of the job will be affected
当我对 Python 和 ThreadPoolExcutor
执行类似操作时,spark 上下文在线程之间共享,对于 Scala 和 Java 也是如此。
是否可以在 R 中的并行执行中重用 sparklyr 上下文?
是的,不幸的是,class spark_connection
的 sc
对象无法导出到另一个 R 进程(即使使用分叉处理)。如果你使用 future.apply package, part of the future 生态系统,你可以看到这个如果你使用:
library(future.apply)
plan(multicore)
## Look for non-exportable objects and given an error if found
options(future.globals.onReference = "error")
y <- future_lapply(input, f)
那会抛出:
Error: Detected a non-exportable reference (‘externalptr’) in one of the
globals (‘sc’ of class ‘spark_connection’) used in the future expression
我有一个 R 代码,它在 sparklyr
中做一些分布式数据预处理,然后将数据收集到 R 本地数据帧,最后将结果保存在 CSV 中。一切都按预期工作,现在我计划在多个输入文件处理中重新使用 spark 上下文。
我的代码看起来类似于这个可重现的示例:
library(dplyr)
library(sparklyr)
sc <- spark_connect(master = "local")
# Generate random input
matrix(rbinom(1000, 1, .5), ncol=1) %>% write.csv('/tmp/input/df0.csv')
matrix(rbinom(1000, 1, .5), ncol=1) %>% write.csv('/tmp/input/df1.csv')
# Multi-job input
input = list(
list(name="df0", path="/tmp/input/df0.csv"),
list(name="df1", path="/tmp/input/df1.csv")
)
global_parallelism = 2
results_dir = "/tmp/results2"
# Function executed on each file
f <- function (job) {
spark_df <- spark_read_csv(sc, "df_tbl", job$path)
local_df <- spark_df %>%
group_by(V1) %>%
summarise(n=n()) %>%
sdf_collect
output_path <- paste(results_dir, "/", job$name, ".csv", sep="")
local_df %>% write.csv(output_path)
return (output_path)
}
如果我使用 lapply
按顺序执行作业输入的功能,一切都按预期工作:
> lapply(input, f)
[[1]]
[1] "/tmp/results2/df0.csv"
[[2]]
[1] "/tmp/results2/df1.csv"
但是,如果我计划并行 运行 它以最大限度地利用 spark 上下文(如果 df0
spark 处理完成并且本地 R 正在处理它,df1
可以已经被 spark 处理):
> library(parallel)
> library(MASS)
> mclapply(input, f, mc.cores = global_parallelism)
*** caught segfault ***
address 0x560b2c134003, cause 'memory not mapped'
[[1]]
[1] "Error in as.vector(x, \"list\") : \n cannot coerce type 'environment' to vector of type 'list'\n"
attr(,"class")
[1] "try-error"
attr(,"condition")
<simpleError in as.vector(x, "list"): cannot coerce type 'environment' to vector of type 'list'>
[[2]]
NULL
Warning messages:
1: In mclapply(input, f, mc.cores = global_parallelism) :
scheduled core 2 did not deliver a result, all values of the job will be affected
2: In mclapply(input, f, mc.cores = global_parallelism) :
scheduled core 1 encountered error in user code, all values of the job will be affected
当我对 Python 和 ThreadPoolExcutor
执行类似操作时,spark 上下文在线程之间共享,对于 Scala 和 Java 也是如此。
是否可以在 R 中的并行执行中重用 sparklyr 上下文?
是的,不幸的是,class spark_connection
的 sc
对象无法导出到另一个 R 进程(即使使用分叉处理)。如果你使用 future.apply package, part of the future 生态系统,你可以看到这个如果你使用:
library(future.apply)
plan(multicore)
## Look for non-exportable objects and given an error if found
options(future.globals.onReference = "error")
y <- future_lapply(input, f)
那会抛出:
Error: Detected a non-exportable reference (‘externalptr’) in one of the
globals (‘sc’ of class ‘spark_connection’) used in the future expression