将并行 Foreach 调用导出到环境中

Exporting a Parallel Foreach Call Into an Enviornment

我想将 foreach 的输出导出到一个环境中。我正在从雅虎财经中提取时间序列数据。

library(quantmod)
library(foreach)
library(parallel)
library(doParallel)
registerDoParallel(cores=2)

hub = new.env()
tickers = c("NKE", "AAPL", "MSFT", "TSLA", "MPC", "PEP", "GIS", "MA","V", "CAT", "KHC", "AMZN", "NFLX", "GS", "MS", "BAC", "GE", "KO", "JPM", "AMAT", "ABT", "BIIB")

#I have tried 2 methods below.
#The first gives me a list of just the ticker names.
#The second puts the data into a list. I am looking for an enviornment
foreach(r = tickers, .packages = "quantmod") %dopar% lapply(r, getSymbols, env = hub)

enviro = foreach(r = tickers, .packages = "quantmod")%dopar% lapply(r, getSymbols, auto.assign = F)

class(enviro)
[1] "list"

环境应该看起来像这样(当我不在 foreach 循环中 运行 它时它工作)。

hub = new.env()
#the following line of code takes about 1 min. Just a heads up
getSymbols(tickers, env = hub)

这个问题很不清楚,但从你的问题来看,你似乎试图将输出组合到一个单一的环境中,以获得更快的速度。

现在您可能应该注意一些事情。 quantmod::getSymbols 每次调用都有相当多的开销。使用您当前的方法,您应该会看到性能下降,因为每个符号都会调用该函数。

优化

一种减少开销的方法是将每个计算拆分成块。 foreach 包依赖于 iterators 包,它允许将计算分成块,非常简单。

nworker = 2
registerDoParallel(cores = nworker)
tickers = matrix(c("NKE", "AAPL", "MSFT", "TSLA", "MPC", "PEP", "GIS", "MA","V", "CAT", "KHC", "AMZN", "NFLX", "GS", "MS", "BAC", "GE", "KO", "JPM", "AMAT", "ABT", "BIIB"), ncol = 1)
tickerIter <- iterators::iter(tickers, by = 'row', #I made a 1 column matrix, so i will iterate over each row.
                              chunksize = ceiling(length(tickers)/nworker) #Set chunk size, such that each worker gets 1 job.
)

在上面的代码中,tickerIter 现在是所有符号的迭代器,块长度为 nworker。因此每个 worker(核心)只得到一个块,我们只需要从每个 worker 导出和导入一次。 tickerIter 将作为我们在 foreach 循环中的参数而不是原始代码。 要查看迭代器如何输出到 foreach 循环,您可以尝试执行 nextElem(tickerIter),这将输出一个块。 注意 但是,您将需要重新分配迭代器,因为如果块已经使用 [=22 输出,则不会在 foreach 循环中分配块=].

融入环境

从您想将输出合并到一个环境中的问题。直接在 foreach 中执行此操作是不可能的,至少不是没有崩溃 R 会话的危险。默认情况下,Foreach 通过创建多个 R 会话、导出数据和执行提供的 code/expression 来执行并行化。因此,您必须挂钩到当前 R 会话,并通过该挂钩将变量分配给环境。不推荐这样做。

但是foreach包含一个.combine参数,可以给它一个自定义函数来组合。此外,如果该函数用于组合任意数量的输入,则使用 .multicombine = TRUE 该函数将只针对每个输出执行一次。

我不明白您为什么要专门将它们添加到集线器环境中,例如在下面的代码示例中,输出被合并到一个列表中。然后可以使用 list2env 转换列表以将输出导出到特定环境。

注意 使用 tickerIter 而不是原始代码。

output <- foreach(r = tickerIter, 
                  .combine = function(...){
                    c(...) #Combine all outputs into a list
                  }, 
                  .multicombine = TRUE, 
                  .packages = "quantmod")%dopar% {
                    currenv <- environment() 
                    getSymbols(r, currenv)
                    as.list(currenv)
                  }
#If you really want it in a specific environment, you could use: (Could also be used in .combine)
list2env(output, hub)