使用 data.table 或并行化加速 R 中的随机马尔可夫链

Speed up random Markov Chain in R using data.table or parellelisation

我正在尝试使用 data.table 或某种形式的并行化来加速 Monte Carlo 离散时间非齐次马尔可夫链的模拟。使用随机虚拟转换矩阵 TM,我在 N 次模拟中的每一次模拟中模拟 nSteps 时间步长,并从初始状态向量 initialState 开始,记录 currentState 中的下一个更新状态。在每个时间步,I 矩阵将当前状态与转移矩阵 TM 相乘。

带循环的代码 1

nStates <- 5 #number of states
initialState <- c(rep(1/nStates, nStates)) #vector with uniform initial states
nSteps <- 10 #number of time steps
N <- 10000 #number of simulations

ind.arr <- matrix(1:(N*nSteps),ncol=nSteps, byrow=TRUE)
currentState <- vector("list",(N*(nSteps))) #collects the nSteps state vectors for each simulation

system.time(
  for (i in 1:N) {
    TM <- matrix(runif(nStates^2), ncol=nStates) #random transition matrix for each time step and each simulation
    currentState[[(ind.arr[i,1])]] <- initialState %*% (TM / rowSums(TM)) #/rowSums(TM) ensures that TM is a transition matrix
    for (t in 2:nSteps){
      TM <- matrix(runif(nStates^2), ncol=nStates)
      currentState[[(ind.arr[i,t])]] <- currentState[[(ind.arr[i,t-1])]] %*% (TM / rowSums(TM))
    }
  })

代码不是很慢,但我想知道避免N-loop是否可以加速代码。如果我将 N 循环的主体放在函数中

statefun <- function(initialState, nSteps, nStates){
  TM <- matrix(runif(nStates^2), ncol=nStates) #random transition matrix for each time step and each simulation
  currentState <- matrix(rep(NA, nSteps*nStates), ncol=nStates)
  currentState[1,] <- initialState %*% (TM / rowSums(TM)) #/rowSums(TM) ensures that TM is a transition matrix
  for (t in 2:nSteps){
    TM <- matrix(runif(nStates^2), ncol=nStates)
    currentState[t,] <- currentState[t-1,] %*% (TM / rowSums(TM))
  }
  return(currentState)
}

并使用 data.table,我得到一个错误而不是想要的结果

library(data.table)
system.time(dt <- data.table(i=1:N)[, c("s1", "s2", "s3", "s4", "s5") := list(statefun(initialState, nSteps, nStates)), by=i])

#As each simulation run is independent and the call of statefun is expensive, I was hoping that parallelisation helps to accelerate the code, but trying foreach is actually slower than where I started.  

library(foreach)
system.time(res <- foreach(i=1:N, .combine='c') %do% statefun(initialState, nSteps, nStates))

对于如何使 data.table 工作或在这种情况下使用并行化的任何评论,我都很感激。 非常感谢, 蒂姆

@ EDIT:This 一个人没有拿起函数调用的十行输出...

system.time( #does not work 
  dt <- data.table(i=1:N)[,c("s1", "s2", "s3", "s4", "s5"):=as.list(statefun(initialState, nSteps, nStates)),by=i]
)

this thread 中有一个示例可以满足您的需要。您需要使用 replicate,来自 base.

中的 lapply 函数
 replicate(N, statefun(initialState, nSteps, nStates))

如果将外部 for 循环转换为包含 10,000 个任务的 foreach 循环,性能会很差,因为任务太小了。使任务数等于工人数通常更好。这是使用 iterators 包中的 idiv 函数执行此操作的简单方法:

library(doParallel)
nw <- 4
cl <- makePSOCKcluster(nw)
registerDoParallel(cl)
nStates <- 5
initialState <- c(rep(1/nStates, nStates))
nSteps <- 10
N <- 10000

currentState <- foreach(n=idiv(N, chunks=nw), .combine='c') %dopar% {
  ind.arr <- matrix(1:(n * nSteps), ncol=nSteps, byrow=TRUE)
  cur <- vector("list", n * nSteps)
  for (i in 1:n) {
    TM <- matrix(runif(nStates^2), ncol=nStates)
    cur[[ind.arr[i,1]]] <- initialState %*% (TM / rowSums(TM))
    for (t in 2:nSteps) {
      TM <- matrix(runif(nStates^2), ncol=nStates)
      cur[[(ind.arr[i,t])]] <-
          cur[[(ind.arr[i,t-1])]] %*% (TM / rowSums(TM))
    }
  }
  cur
}

这不是简单地并行化外部 for 循环,而是在较小版本的顺序代码周围添加了一个 foreach 循环。因此,如果您找到一种改进顺序代码的方法,您可以轻松地在并行版本中使用它。您还可以通过不返回所有中间状态来获得更好的性能。