与 data.table 并行化

Parallelization with data.table

我有以下问题。我有一个由 (xPoints, yPoints) 描述的分段线性函数,我想快速计算——我必须一遍又一遍地计算——一长串 x 的隐含 y 值,其中 x 可能落在外面xPoints 的范围。我编写了一个函数 f_pwl 来计算隐含的 y 值,但它很慢,所以我试图并行化它的调用。但它实际上比使用 data.table := 语法慢。我将感谢通过改进我的 f_pwl 功能或实施高效并行化来加快速度的建议,因为我可以访问 20 个内核来加快速度。

这是示例代码。

    # libraries
    require(data.table) # for fread, work with large data
    require(abind)      # for abind()
    require(foreach) # for parallel processing, used with doParallel
    require(doParallel) # for parallel processing, used with foreach

    f_pwl <- function(x)  {
      temp <- as.vector( rep(NA, length = length(x)), mode = "double" )
      for (i in seq(from = 1, to = length(x), by = 1))  {
        if (x[i] > max(xPoints) | x[i] < min(xPoints)) {
          # nothing to do, temp[i] <- NA
        } else if (x[i] == max(xPoints)) {
          # value equal max(yPoints)
          temp[i] <- max(yPoints)
        } else {
          # value is f_pwl(x)
          xIndexVector = as.logical( x[i] >= xPoints  &  abind(xPoints[2:length(xPoints)], max(xPoints)) > x[i] )
          xIndexVector_plus1 = shift( xIndexVector, n = 1, fill = FALSE, type = "lag" )
          alpha_j = (xPoints[xIndexVector_plus1] - x[i])/(xPoints[xIndexVector_plus1] - xPoints[xIndexVector])
          temp[i] <- alpha_j %*% yPoints[xIndexVector] + (1-alpha_j) %*% yPoints[xIndexVector_plus1]
        }
      } # end for i
      as.vector( temp, mode = "double" )
    }


    ## Main program
    xPoints <- c(4, 9, 12, 15, 18, 21)
    yPoints <- c(1, 2, 3, 4, 5, 6)

    x <- rnorm(1e4, mean = 12, sd = 5)

    dt <- as.data.table( x )
    dt[ , c("y1", "y2", "y3") := as.vector( mode = "double", NA ) ]

    # data.table := command
    system.time({
      dt[, y2 := f_pwl( x ) ]
    })

    # mapply
    system.time({
      dt[ , y1 := mapply( f_pwl, x ), by=.I  ]
    })

    # parallel
    system.time({
      #setup parallel backend to use many processors
      cores=detectCores()
      cl <- makeCluster(cores[1]-1, type="FORK") #not to overload your computer
      registerDoParallel(cl)
      dt$y3 <- foreach(i=1:nrow(dt), .combine=cbind) %dopar% {
        tempY <- f_pwl( dt$x[i] )
        tempY
      }
      #stop cluster
      stopCluster(cl)
    })

    summary( dt[ , .(y1-y2, y1-y3, y2-y3)] )   

首先,计算并存储 alpha_j 个。

然后,先按 x 排序 DT,然后在执行线性插值之前将其切割成相关的区间

alpha <- c(NA, diff(yPoints) / diff(xPoints))

DT[order(x), 
    y := alpha[.GRP] * (x - xPoints[.GRP-1L]) + yPoints[.GRP-1L], 
    by=cut(x, xPoints)]

请告诉我它的表现如何。

数据:

library(data.table)

## Main program
set.seed(27L)
xPoints <- c(4, 9, 12, 15, 18, 21)
yPoints <- c(1, 2, 3, 4, 5, 6)
DT <- data.table(x=rnorm(1e4, mean=12, sd=5))

检查:

f_pwl <- function(x)  {
    temp <- as.vector( rep(NA, length = length(x)), mode = "double" )
    for (i in seq(from = 1, to = length(x), by = 1))  {
        if (x[i] > max(xPoints) | x[i] < min(xPoints)) {
            # nothing to do, temp[i] <- NA
        } else if (x[i] == max(xPoints)) {
            # value equal max(yPoints)
            temp[i] <- max(yPoints)
        } else {
            # value is f_pwl(x)
            xIndexVector = as.logical( x[i] >= xPoints  &  abind(xPoints[2:length(xPoints)], max(xPoints)) > x[i] )
            xIndexVector_plus1 = shift( xIndexVector, n = 1, fill = FALSE, type = "lag" )
            alpha_j = (xPoints[xIndexVector_plus1] - x[i])/(xPoints[xIndexVector_plus1] - xPoints[xIndexVector])
            temp[i] <- alpha_j %*% yPoints[xIndexVector] + (1-alpha_j) %*% yPoints[xIndexVector_plus1]
        }
    } # end for i
    as.vector( temp, mode = "double" )
}
system.time({
    DT[, yOP := f_pwl( x ) ]
})

DT[abs(y-yOP) > 1e-6]
#Empty data.table (0 rows) of 3 cols: x,y,yOP