使用 data.table 或并行化加速 R 中的随机马尔可夫链
Speed up random Markov Chain in R using data.table or parellelisation
我正在尝试使用 data.table 或某种形式的并行化来加速 Monte Carlo 离散时间非齐次马尔可夫链的模拟。使用随机虚拟转换矩阵 TM,我在 N 次模拟中的每一次模拟中模拟 nSteps 时间步长,并从初始状态向量 initialState 开始,记录 currentState 中的下一个更新状态。在每个时间步,I 矩阵将当前状态与转移矩阵 TM 相乘。
带循环的代码 1
nStates <- 5 #number of states
initialState <- c(rep(1/nStates, nStates)) #vector with uniform initial states
nSteps <- 10 #number of time steps
N <- 10000 #number of simulations
ind.arr <- matrix(1:(N*nSteps),ncol=nSteps, byrow=TRUE)
currentState <- vector("list",(N*(nSteps))) #collects the nSteps state vectors for each simulation
system.time(
for (i in 1:N) {
TM <- matrix(runif(nStates^2), ncol=nStates) #random transition matrix for each time step and each simulation
currentState[[(ind.arr[i,1])]] <- initialState %*% (TM / rowSums(TM)) #/rowSums(TM) ensures that TM is a transition matrix
for (t in 2:nSteps){
TM <- matrix(runif(nStates^2), ncol=nStates)
currentState[[(ind.arr[i,t])]] <- currentState[[(ind.arr[i,t-1])]] %*% (TM / rowSums(TM))
}
})
代码不是很慢,但我想知道避免N-loop是否可以加速代码。如果我将 N 循环的主体放在函数中
statefun <- function(initialState, nSteps, nStates){
TM <- matrix(runif(nStates^2), ncol=nStates) #random transition matrix for each time step and each simulation
currentState <- matrix(rep(NA, nSteps*nStates), ncol=nStates)
currentState[1,] <- initialState %*% (TM / rowSums(TM)) #/rowSums(TM) ensures that TM is a transition matrix
for (t in 2:nSteps){
TM <- matrix(runif(nStates^2), ncol=nStates)
currentState[t,] <- currentState[t-1,] %*% (TM / rowSums(TM))
}
return(currentState)
}
并使用 data.table,我得到一个错误而不是想要的结果
library(data.table)
system.time(dt <- data.table(i=1:N)[, c("s1", "s2", "s3", "s4", "s5") := list(statefun(initialState, nSteps, nStates)), by=i])
#As each simulation run is independent and the call of statefun is expensive, I was hoping that parallelisation helps to accelerate the code, but trying foreach is actually slower than where I started.
library(foreach)
system.time(res <- foreach(i=1:N, .combine='c') %do% statefun(initialState, nSteps, nStates))
对于如何使 data.table 工作或在这种情况下使用并行化的任何评论,我都很感激。
非常感谢,
蒂姆
@ EDIT:This 一个人没有拿起函数调用的十行输出...
system.time( #does not work
dt <- data.table(i=1:N)[,c("s1", "s2", "s3", "s4", "s5"):=as.list(statefun(initialState, nSteps, nStates)),by=i]
)
this thread 中有一个示例可以满足您的需要。您需要使用 replicate,来自 base.
中的 lapply 函数
replicate(N, statefun(initialState, nSteps, nStates))
如果将外部 for 循环转换为包含 10,000 个任务的 foreach 循环,性能会很差,因为任务太小了。使任务数等于工人数通常更好。这是使用 iterators
包中的 idiv
函数执行此操作的简单方法:
library(doParallel)
nw <- 4
cl <- makePSOCKcluster(nw)
registerDoParallel(cl)
nStates <- 5
initialState <- c(rep(1/nStates, nStates))
nSteps <- 10
N <- 10000
currentState <- foreach(n=idiv(N, chunks=nw), .combine='c') %dopar% {
ind.arr <- matrix(1:(n * nSteps), ncol=nSteps, byrow=TRUE)
cur <- vector("list", n * nSteps)
for (i in 1:n) {
TM <- matrix(runif(nStates^2), ncol=nStates)
cur[[ind.arr[i,1]]] <- initialState %*% (TM / rowSums(TM))
for (t in 2:nSteps) {
TM <- matrix(runif(nStates^2), ncol=nStates)
cur[[(ind.arr[i,t])]] <-
cur[[(ind.arr[i,t-1])]] %*% (TM / rowSums(TM))
}
}
cur
}
这不是简单地并行化外部 for 循环,而是在较小版本的顺序代码周围添加了一个 foreach 循环。因此,如果您找到一种改进顺序代码的方法,您可以轻松地在并行版本中使用它。您还可以通过不返回所有中间状态来获得更好的性能。
我正在尝试使用 data.table 或某种形式的并行化来加速 Monte Carlo 离散时间非齐次马尔可夫链的模拟。使用随机虚拟转换矩阵 TM,我在 N 次模拟中的每一次模拟中模拟 nSteps 时间步长,并从初始状态向量 initialState 开始,记录 currentState 中的下一个更新状态。在每个时间步,I 矩阵将当前状态与转移矩阵 TM 相乘。
带循环的代码 1
nStates <- 5 #number of states
initialState <- c(rep(1/nStates, nStates)) #vector with uniform initial states
nSteps <- 10 #number of time steps
N <- 10000 #number of simulations
ind.arr <- matrix(1:(N*nSteps),ncol=nSteps, byrow=TRUE)
currentState <- vector("list",(N*(nSteps))) #collects the nSteps state vectors for each simulation
system.time(
for (i in 1:N) {
TM <- matrix(runif(nStates^2), ncol=nStates) #random transition matrix for each time step and each simulation
currentState[[(ind.arr[i,1])]] <- initialState %*% (TM / rowSums(TM)) #/rowSums(TM) ensures that TM is a transition matrix
for (t in 2:nSteps){
TM <- matrix(runif(nStates^2), ncol=nStates)
currentState[[(ind.arr[i,t])]] <- currentState[[(ind.arr[i,t-1])]] %*% (TM / rowSums(TM))
}
})
代码不是很慢,但我想知道避免N-loop是否可以加速代码。如果我将 N 循环的主体放在函数中
statefun <- function(initialState, nSteps, nStates){
TM <- matrix(runif(nStates^2), ncol=nStates) #random transition matrix for each time step and each simulation
currentState <- matrix(rep(NA, nSteps*nStates), ncol=nStates)
currentState[1,] <- initialState %*% (TM / rowSums(TM)) #/rowSums(TM) ensures that TM is a transition matrix
for (t in 2:nSteps){
TM <- matrix(runif(nStates^2), ncol=nStates)
currentState[t,] <- currentState[t-1,] %*% (TM / rowSums(TM))
}
return(currentState)
}
并使用 data.table,我得到一个错误而不是想要的结果
library(data.table)
system.time(dt <- data.table(i=1:N)[, c("s1", "s2", "s3", "s4", "s5") := list(statefun(initialState, nSteps, nStates)), by=i])
#As each simulation run is independent and the call of statefun is expensive, I was hoping that parallelisation helps to accelerate the code, but trying foreach is actually slower than where I started.
library(foreach)
system.time(res <- foreach(i=1:N, .combine='c') %do% statefun(initialState, nSteps, nStates))
对于如何使 data.table 工作或在这种情况下使用并行化的任何评论,我都很感激。 非常感谢, 蒂姆
@ EDIT:This 一个人没有拿起函数调用的十行输出...
system.time( #does not work
dt <- data.table(i=1:N)[,c("s1", "s2", "s3", "s4", "s5"):=as.list(statefun(initialState, nSteps, nStates)),by=i]
)
this thread 中有一个示例可以满足您的需要。您需要使用 replicate,来自 base.
中的 lapply 函数 replicate(N, statefun(initialState, nSteps, nStates))
如果将外部 for 循环转换为包含 10,000 个任务的 foreach 循环,性能会很差,因为任务太小了。使任务数等于工人数通常更好。这是使用 iterators
包中的 idiv
函数执行此操作的简单方法:
library(doParallel)
nw <- 4
cl <- makePSOCKcluster(nw)
registerDoParallel(cl)
nStates <- 5
initialState <- c(rep(1/nStates, nStates))
nSteps <- 10
N <- 10000
currentState <- foreach(n=idiv(N, chunks=nw), .combine='c') %dopar% {
ind.arr <- matrix(1:(n * nSteps), ncol=nSteps, byrow=TRUE)
cur <- vector("list", n * nSteps)
for (i in 1:n) {
TM <- matrix(runif(nStates^2), ncol=nStates)
cur[[ind.arr[i,1]]] <- initialState %*% (TM / rowSums(TM))
for (t in 2:nSteps) {
TM <- matrix(runif(nStates^2), ncol=nStates)
cur[[(ind.arr[i,t])]] <-
cur[[(ind.arr[i,t-1])]] %*% (TM / rowSums(TM))
}
}
cur
}
这不是简单地并行化外部 for 循环,而是在较小版本的顺序代码周围添加了一个 foreach 循环。因此,如果您找到一种改进顺序代码的方法,您可以轻松地在并行版本中使用它。您还可以通过不返回所有中间状态来获得更好的性能。