R、dplyr 和 snow:如何并行化使用 dplyr 的函数

R, dplyr and snow: how to parallelize functions which use dplyr

假设我想以并行方式将 myfunction 应用到 myDataFrame 的每一行。假设 otherDataFrame 是一个包含两列的数据框:COLUNM1_odfCOLUMN2_odf 由于某些原因在 myfunction 中使用。所以我想使用 parApply 编写代码,如下所示:

clus <- makeCluster(4)
clusterExport(clus, list("myfunction","%>%"))

myfunction <- function(fst, snd) {
 #otherFunction and aGlobalDataFrame are defined in the global env
 otherFunction(aGlobalDataFrame)

 # some code to create otherDataFrame **INTERNALLY** to this function
 otherDataFrame %>% filter(COLUMN1_odf==fst & COLUMN2_odf==snd)
 return(otherDataFrame)
}
do.call(bind_rows,parApply(clus,myDataFrame,1,function(r) { myfunction(r[1],r[2]) }

这里的问题是 R 无法识别 COLUMN1_odfCOLUMN2_odf,即使我将它们插入 clusterExport。我怎么解决这个问题?有没有办法 "export" snow 需要的所有对象,以便不枚举它们中的每一个?

编辑 1:我添加了一条注释(在上面的代码中)以指定 otherDataFrame 是在 myfunction 内部创建的.

编辑 2:我添加了一些伪代码以概括 myfunction:它现在使用全局数据帧(aGlobalDataFrame 和另一个函数 otherFunction

既然我没有在我的 phone 上查看这个,我可以看到几个问题。

首先,您实际上并没有在函数中创建 otherDataFrame。您正在尝试将现有的 otherDataFrame 通过管道传输到 filter,如果环境中不存在 otherDataFrame,该函数将失败。

其次,除非您已经将 dplyr 包加载到集群环境中,否则您将调用错误的 filter 函数。

最后,当您调用 parApply 时,您没有在任何地方指定 fstsnd 应该是什么。试试以下方法:

clus <- makeCluster(4)
clusterEvalQ(clus, {library(dplyr); library(magrittr)})
clusterExport(clus, "myfunction")

myfunction <- function(otherDataFrame, fst, snd) {
 dplyr::filter(otherDataFrame, COLUMN1_odf==fst & COLUMN2_odf==snd)
}
do.call(bind_rows,parApply(clus,myDataFrame,1,function(r, fst, snd) { myfunction(r[fst],r[snd]), "[fst]", "[snd]") }

做了一些实验,所以我解决了我的问题(根据本杰明的建议并考虑到我添加到问题中的 'edit'):

clus <- makeCluster(4)
clusterEvalQ(clus, {library(dplyr); library(magrittr)})
clusterExport(clus, "myfunction", "otherfunction", aGlobalDataFrame)

myfunction <- function(fst, snd) {
 #otherFunction and aGlobalDataFrame are defined in the global env
 otherFunction(aGlobalDataFrame)

 # some code to create otherDataFrame **INTERNALLY** to this function
 otherDataFrame %>% dplyr::filter(COLUMN1_odf==fst & COLUMN2_odf==snd)
 return(otherDataFrame)
}

do.call(bind_rows, parApply(clus, myDataFrame, 1, 
        {function(r) { myfunction(r[1], r[2]) } )

通过这种方式,我注册了 aGlobalDataFramemyfunctionotherfunction,简而言之,用于并行化作业的所有函数和函数使用的数据 (myfunction本身)