使用 RStorm 计算多个 data.table 列的 Welford 方差

Computing Welford's variance for multiple data.table columns using RStorm

给定以下 data.table dt:

    i a  b
 1: 1 1 NA
 2: 2 1 NA
 3: 2 2  2
 4: 3 1  1
 5: 3 2  2
 6: 3 3 NA
 7: 4 1 NA
 8: 4 2  2
 9: 4 3  3
10: 4 4 NA

我想使用 Welford's Method and the RStorm package facilities. I followed along the example on page 4 of RStorm's vignette and read through an introductory paper on RStorm 计算按列 i 分组的 ab 列的 运行 方差,但我无法计算弄清楚如何让它发挥作用。这是我的代码:

library(RStorm)
dt = data.table(i=c(1,2,2,3,3,3,4,4,4,4), a=c(1,1:2,1:3,1:4), b=c(NA,NA,2,1,2,NA,NA,2,3,NA)
in_cols = c('a','b')
out_cols <- paste0(in_cols, '.var.Welford')
## Calculaing variance using Welford's method
## See: https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance
## See: "RStorm: Developing and Testing Streaming Algorithms in R", R Journal Vol 6/1
var.Welford <- function(x, ...) {
    x <- as.numeric(x[1])
    params <- GetHash("params2")
    if (!is.data.frame(params)) {
        params <- list()
        params$M <- params$S <- params$n <- 0
    }
    x <- ifelse(is.na(x), params$M, x)
    n <- params$n + 1
    delta <- (x - params$M)
    M <- params$M + ( delta / (n + 1) )
    S <- params$S + delta*(x - M)
    SetHash("params2", data.frame(n=n,M=M,S=S))
    var <- ifelse(n > 1, S / (n-1), 0)
    TrackRow("var.Welford", data.frame(var = var))
}
computeVarWelford <- function(x) {
    topology <- Topology(as.data.frame(x=as.data.frame(x)))
    topology <- AddBolt(topology, Bolt(var.Welford, listen = 0))
    result <- RStorm(topology)
    # GetTrack('var.Welford', result)
    result$track$var.Welford
}

## Execute:
dt[, eval(out_cols) := lapply(.SD, function(x) {return(as.list(computeVarWelford(x))[1])})
, by=i, .SDcols = in_cols]

执行上面的行将 dt 转换为:

    i a  b                       a.var.Welford                       b.var.Welford
 1: 1 1 NA                                   0                                   0
 2: 2 1 NA                                 0,2                   0.000000,2.666667
 3: 2 2  2                                 0,2                   0.000000,2.666667
 4: 3 1  1                         0.0,2.0,2.5                               0,2,1
 5: 3 2  2                         0.0,2.0,2.5                               0,2,1
 6: 3 3 NA                         0.0,2.0,2.5                               0,2,1
 7: 4 1 NA 0.000000,2.000000,2.500000,3.333333 0.000000,2.666667,3.375000,2.250000
 8: 4 2  2 0.000000,2.000000,2.500000,3.333333 0.000000,2.666667,3.375000,2.250000
 9: 4 3  3 0.000000,2.000000,2.500000,3.333333 0.000000,2.666667,3.375000,2.250000
10: 4 4 NA 0.000000,2.000000,2.500000,3.333333 0.000000,2.666667,3.375000,2.250000

从结果中可以清楚地看出,每个(列,组)对的整个方差列表都被复制到该(列,组)对的每个元素中,而不是映射到该(列,组)对的所有元素。这才是我真正想要的:

    i a  b     a.var.Welford        b.var.Welford
 1: 1 1 NA     0                    0
 2: 2 1 NA     0                    0
 3: 2 2  2     2                    2.666667
 4: 3 1  1     0.0                  0
 5: 3 2  2     2.0                  2
 6: 3 3 NA     2.5                  1
 7: 4 1 NA     0.000000             0.000000
 8: 4 2  2     2.000000             2.666667
 9: 4 3  3     2.500000             3.375000
10: 4 4 NA     3.333333             2.250000

我真的希望有一个简单的解决方法,但我一直没能弄清楚。每次我尝试我认为应该起作用的方法时,我最终都会收到 data.table 的错误消息

All items in j=list(...) should be atomic vectors or lists. If you are trying something like j=list(.SD,newcol=mean(colA)) then use := by group instead (much quicker), or cbind or merge afterwards.

我的理解是,我在 lapply(.SD, FUN) 代码中尝试的任何 FUN 的 return 值的维度与 data.table 需要该组的专栏。

非常感谢任何帮助。

编辑:好的,解决方案非常简单。我觉得我好笨。但这是以后可能需要它的人的答案

## Make sure to use [[]] at the end. My problem came entirely down to using [].
dt[, eval(out_cols) := lapply(.SD, function(x) {return(as.list(computeVarWelford(x))[[1]])})
   , by=i, .SDcols = in_cols]

这很有魅力。我得到了我需要的东西:

    i a  b a.var.Welford b.var.Welford
 1: 1 1 NA      0.000000      0.000000
 2: 2 1 NA      0.000000      0.000000
 3: 2 2  2      2.000000      2.666667
 4: 3 1  1      0.000000      0.000000
 5: 3 2  2      2.000000      2.000000
 6: 3 3 NA      2.500000      1.000000
 7: 4 1 NA      0.000000      0.000000
 8: 4 2  2      2.000000      2.666667
 9: 4 3  3      2.500000      3.375000
10: 4 4 NA      3.333333      2.250000

编辑:我不再需要下面的 hack 解决方案。这是解决此问题的代码(注意 [[]] 而不是 [] 修复):

dt[, eval(out_cols) := lapply(.SD, function(x) {return(as.list(computeVarWelford(x))[[1]])})
   , by=i, .SDcols = in_cols]

OLD:好的,所以我终于想出了一个办法让它发挥作用。但我觉得这条路很丑。我暂时接受这个作为我的答案,但是如果有人有更好的解决方案,我很乐意听到它,如果它比我的更好,我会接受它作为这个问题的答案。

解决方案:

out_cols_fixed <- paste0(out_cols, '.fixed')
dt[,eval(out_cols_fixed) := lapply(.SD, function(x) { return(x[1][[1]]) }), by=i, .SDcols = out_cols]
dt[,eval(out_cols) := NULL]
setnames(dt, old = out_cols_fixed, new = out_cols)

dt 所需的结果:

    i a  b a.var.Welford b.var.Welford
 1: 1 1 NA      0.000000      0.000000
 2: 2 1 NA      0.000000      0.000000
 3: 2 2  2      2.000000      2.666667
 4: 3 1  1      0.000000      0.000000
 5: 3 2  2      2.000000      2.000000
 6: 3 3 NA      2.500000      1.000000
 7: 4 1 NA      0.000000      0.000000
 8: 4 2  2      2.000000      2.666667
 9: 4 3  3      2.500000      3.375000
10: 4 4 NA      3.333333      2.250000

我首先尝试了以下方法,但没有用。谁能解释一下为什么?

dt[,eval(out_cols) := lapply(.SD, function(x) { return(x[1][[1]]) }), by=i, .SDcols = out_cols]

我从上面的 运行 行得到以下错误:

Error in [.data.table(dt, , :=(eval(out_cols), lapply(.SD, function(x) { : Type of RHS ('double') must match LHS ('list'). To check and coerce would impact performance too much for the fastest cases. Either change the type of the target column, or coerce the RHS of := yourself (e.g. by using 1L instead of 1)