在 mlr 中使用带有自定义过滤器的 parallelMap 包

Using parallelMap Package with Custom Filter in mlr

我和 mlr 一起做一个文本分类任务。我已经按照此处所述编写了自定义过滤器

Create Custom Filters

过滤器按预期工作,但是当我尝试并使用 parallelization 时,我收到以下错误:

Exporting objects to slaves for mode socket: .mlr.slave.options
Mapping in parallel: mode = socket; cpus = 4; elements = 2.
Error in stopWithJobErrorMessages(inds, vcapply(result.list[inds], as.character)) : 
  Errors occurred in 2 slave jobs, displaying at most 10 of them:

00001: Error in parallel:::.slaveRSOCK() : 
  Assertion on 'method' failed: Must be element of set {'anova.test','carscore','cforest.importance','chi.squared','gain.ratio','information.gain','kruskal.test','linear.correlation','mrmr','oneR','permutation.importance','randomForest.importance','randomForestSRC.rfsrc','randomForestSRC.var.select','rank.correlation','relief','rf.importance','rf.min.depth','symmetrical.uncertainty','univariate','univariate.model.score','variance'}.

我从错误中假设我的自定义过滤器需要是集合中的一个元素才能有机会并行工作,但还没有设法解决 (a) 这是可能的,并且(b) 如果是,我该怎么做。

在此先感谢您的帮助, 阿扎姆

已添加:测试脚本 由于敏感性,我无法让您看到我正在使用的实际 script/data,但此示例重现了我看到的错误。除了自定义特征选择和数据集之外,设置学习器和对其进行评估的步骤与我在 'real' 脚本中的步骤相同。在我的真实情况下,如果您删除 parallelStartSocket() 命令,那么脚本会按预期运行。

我还应该补充一点,在使用 RBF 内核调整 SVM 的超参数时,我已经成功地使用了(或者至少我没有收到错误)并行处理:除了 makeParamSet() 定义之外,脚本是相同的。

library(parallelMap)
library(mlr)
library(kernlab)

makeFilter(
  name = "nonsense.filter",
  desc = "Calculates scores according to alphabetical order of features",
  pkg = "mlr",
  supported.tasks = c("classif", "regr", "surv"),
  supported.features = c("numerics", "factors", "ordered"),
  fun = function(task, nselect, decreasing = TRUE, ...) {
    feats = getTaskFeatureNames(task)
    imp = order(feats, decreasing = decreasing)
    names(imp) = feats
    imp
  }
)

# set up svm with rbf kernal
svm.lrn <- makeLearner("classif.ksvm",predict.type = "response")  

# wrap learner with filter
svm.lrn <- makeFilterWrapper(svm.lrn, fw.method = "nonsense.filter")

# define feature selection parameters 

ps.svm = makeParamSet(
  makeDiscreteParam("fw.abs", values = seq(2, 3, 1)) 

)

# define inner search and evaluation strategy
ctrl.svm = makeTuneControlGrid()
inner.svm = makeResampleDesc("CV", iters = 5, stratify = TRUE)

svm.lrn <- makeTuneWrapper(svm.lrn, resampling = inner.svm, par.set = ps.svm, 
                           control = ctrl.svm)

# set up outer resampling
outer.svm <-  makeResampleDesc("CV", iters = 10, stratify = TRUE)

# run it...

parallelStartSocket(2)

run.svm <- resample(svm.lrn, iris.task, 
                    resampling = outer.svm, extract = getTuneResult)

parallelStop()

问题是 makeFilter 注册了 S3 方法,这些方法在单独的 R 进程中不可用。您有两种选择来完成这项工作:要么简单地使用 parallelStartMulticore(2) 以便所有内容都在同一个 R 进程中运行,要么告诉 parallelMap 需要存在于其他 R 进程中的部分。

后者有两个部分。首先,使用 parallelLibrary("mlr") 在任何地方加载 mlr,并将过滤器的定义提取到一个单独的文件中,该文件可以使用 parallelSource() 加载。例如:

filter.R:

makeFilter(
  name = "nonsense.filter",
  desc = "Calculates scores according to alphabetical order of features",
  pkg = "mlr",
  supported.tasks = c("classif", "regr", "surv"),
  supported.features = c("numerics", "factors", "ordered"),
  fun = function(task, nselect, decreasing = TRUE, ...) {
    feats = getTaskFeatureNames(task)
    imp = order(feats, decreasing = decreasing)
    names(imp) = feats
    imp
  }
)

main.R:

library(parallelMap)
library(mlr)
library(kernlab)

parallelStartSocket(2)

parallelLibrary("mlr")
parallelSource("filter.R")

# set up svm with rbf kernal
svm.lrn = makeLearner("classif.ksvm",predict.type = "response")  

# wrap learner with filter
svm.lrn = makeFilterWrapper(svm.lrn, fw.method = "nonsense.filter")

# define feature selection parameters 

ps.svm = makeParamSet(
  makeDiscreteParam("fw.abs", values = seq(2, 3, 1)) 

)

# define inner search and evaluation strategy
ctrl.svm = makeTuneControlGrid()
inner.svm = makeResampleDesc("CV", iters = 5, stratify = TRUE)

svm.lrn = makeTuneWrapper(svm.lrn, resampling = inner.svm, par.set = ps.svm, 
                           control = ctrl.svm)

# set up outer resampling
outer.svm =  makeResampleDesc("CV", iters = 10, stratify = TRUE)

# run it...
run.svm = resample(svm.lrn, iris.task, resampling = outer.svm, extract = getTuneResult)

parallelStop()