如何以 R "data.table" 方式记录非重叠范围(或时间间隔)?

How to record non-overlapping ranges (or time intervals) in R "data.table" way?

我的 objective 是为每个 ID 定义唯一的非重叠范围(或时间间隔),当每个 ID 有多个可能重叠的曝光范围时。我发现 R "IntervalSurgeon" 包中的 "flatten" 函数可以实现该任务。我的问题是:如何有效地执行相同的任务并以 "data.table" 的方式获得相同的 "tab_out" 输出?

library(data.table)
library(IntervalSurgeon)

set.seed(2019)

N <- 3 # number of IDs

IDs <- paste0("ID", 1:N) # unique IDs

K <- 4 # number of exposures per ID

DT <- data.table(IDs = rep(IDs, each = K), 
    starts = sample(1:20, N * K, replace = T))[,
    ends := starts + sample(1:5, N * K, replace = T)]


DT <- DT[order(IDs, starts),]

tab_out <- DT[, as.list(data.table(
    flatten(as.matrix(cbind(starts, ends))))), 
    by = IDs]

DT
    IDs starts ends
 1: ID1      7   11
 2: ID1     13   17
 3: ID1     15   16
 4: ID1     16   18
 5: ID2      1    5
 6: ID2      1    4
 7: ID2      2    3
 8: ID2     17   19
 9: ID3      3    6
10: ID3     13   16
11: ID3     14   15
12: ID3     16   21

tab_out
   IDs V1 V2
1: ID1  7 11
2: ID1 13 18
3: ID2  1  5
4: ID2 17 19
5: ID3  3  6
6: ID3 13 21

这是一个使用 intervals-package..

的解决方案

示例数据

library( data.table )
library( intervals )

DT <- fread("
IDs starts ends
ID1      7   11
ID1     13   17
ID1     15   16
ID1     16   18
ID2      1    5
ID2      1    4
ID2      2    3
ID2     17   19
ID3      3    6
ID3     13   16
ID3     14   15
ID3     16   21")

代码

myfun <- function( y ) {
  data.table::as.data.table( 
    intervals::interval_union(
      intervals::Intervals( as.matrix( y ) ), check_valid = TRUE ) 
    )
}

DT[, myfun( .SD ), by = .(IDs)]

#    IDs V1 V2
# 1: ID1  7 11
# 2: ID1 13 18
# 3: ID2  1  5
# 4: ID2 17 19
# 5: ID3  3  6
# 6: ID3 13 21

借鉴 David Aurenburg 的解决方案

DT[, g := c(0L, cumsum(shift(starts, -1L) > cummax(ends))[-.N]), IDs][,
    .(min(starts), max(ends)), .(g, IDs)]

输出:

   g IDs V1 V2
1: 0 ID1  7 11
2: 1 ID1 13 18
3: 0 ID2  1  5
4: 1 ID2 17 19
5: 0 ID3  3  6
6: 1 ID3 13 21

下面是 "IntervalSurgeon"、"intervals" 和 "data.table" 方法的计算时间。时间是针对包含一百万个 ID 的数据集,每个 ID 曝光 10 次,即 1000 万行。由于 "intervals" 方法花费的时间太长,我已经完成了一个 运行。

机器是 MacBook Pro(15 英寸,2018 年),配备 2.9 GHz Intel Core i9 处理器、32 GB 2400 MHz DDR4 内存,运行在 MacOC Mojave v. 10.14.6 上运行,最多 12线程。

"data.table" 方法是明显的赢家,与 "intervals" 方法是明显的输家一样:

计算时间

"IntervalSurgeon way:"
   user  system elapsed 
469.296   6.528 473.200 

"intervals way:"
    user   system  elapsed 
2463.840    8.137 2476.685 

"data.table way:"
   user  system elapsed 
 22.125   0.133  21.249 

有趣的是,none 方法受益于通过 setDTthreads() 设置大量 data.table 线程。将数据集拆分为 100 等份并通过 parallel::mclapply(mc.cores = 10)传递给 "data.table" 方法使计算时间低于 5 秒(下面也提供了代码)。

"data.table + parallel::mclapply" 组合方法也适用于更大的 50M ID 数据集(生成与 1M ID 数据集相同,但 MM = 50)。这个数据集有 500M 行,为 mclapply 分成 1000 个子集。我设置 mc.cores = 4 以免耗尽 RAM。单个 运行 计算时间为 451.485 秒,这对于笔记本电脑来说是个不错的结果!如果有一天将这种分析与 IntervalSurgeon 包中的其他类似类型的区间分析一起合并到 data.table 包中,那就太好了。

生成数据集

library(data.table)
library(fst)

rm(list = ls())
gc()

set.seed(2019)

# how many millions of IDs required?
MM <- 1 
N <- 1000000 * MM

# unique IDs
IDs <- paste0("ID", 1:N)

# number of exposures per ID
K <- 10 

ss <- sample(1:365, N * K, replace = T)
ff <- sample(1:365, N * K, replace = T)

ss_s <- pmin(ss, ff)
ff_s <- pmax(ss, ff)

DT <- data.table(IDs = rep(IDs, each = K), 
     starts = as.integer(ss_s), 
        ends =  as.integer(ff_s + 1))

DT[order(IDs, starts, ends),]

write_fst(DT, path = paste0("fst_DT_", MM, "Mx3.fst"))

DT2
                IDs starts ends
       1:      ID1      4   22
       2:      ID1     16  233
       3:      ID1     19  224
       4:      ID1     31  227
       5:      ID1     38  147
      ---                     
 9999996: ID999999    152  310
 9999997: ID999999    160  218
 9999998: ID999999    180  272
 9999999: ID999999    215  332
10000000: ID999999    260  265

三种方法的代码

library(data.table)
library(IntervalSurgeon)
library(intervals)
library(fst)

rm(list = ls())
gc()

threads_no <- 2L
setDTthreads(threads_no)
print(getDTthreads())

# read the dataset generated above
#####################
DT <- read_fst("fst_DT_1Mx3.fst", as.data.table = T)

print("dataset dimentions:")
print(dim(DT))
#####################

# "intervals" function
#####################
myfun <- function( y ) {
  data.table::as.data.table( 
    intervals::interval_union(
      intervals::Intervals( as.matrix( y ) ), check_valid = TRUE ) 
    )
}
#####################

# 1) "IntervalSurgeon" way
#####################
ptm1 <- proc.time()

tab_out1 <- DT[, as.list(data.table(
    flatten(as.matrix(cbind(starts, ends))))), 
        by = IDs]

exec_time1 <- proc.time() - ptm1
print("IntervalSurgeon way:")
print(exec_time1)
#####################       

# 2) "intervals" way  
##################### 
ptm2 <- proc.time()

tab_out2 <- DT[, myfun( .SD ), by = .(IDs)][,
    c("V1", "V2") := lapply(.SD, as.integer), .SDcols = c("V1", "V2")]

exec_time2 <- proc.time() - ptm2
print("intervals way:")
print(exec_time2)
##################### 

# 3) "data.table" way
#####################
ptm3 <- proc.time()

tab_out3 <- DT[, g := c(0L, cumsum(shift(starts, -1L) > cummax(ends))[-.N]), IDs][,
    .(min(starts), max(ends)), .(g, IDs)][,g := NULL]

exec_time3 <- proc.time() - ptm3
print("data.table way:")
print(exec_time3)
#####################  

print(identical(tab_out1, tab_out2))
print(identical(tab_out2, tab_out3))

"data.table + parallel::mclapply" 组合方法

library(data.table)
library(IntervalSurgeon)
library(intervals)
library(fst)
library(parallel)

rm(list = ls())
gc()

mc_cores <- 10

threads_no <- 2L
setDTthreads(threads_no)
print(getDTthreads())

DT <- read_fst("fst_DT_1Mx3.fst", as.data.table = T)

# split the dataset into G equal parts
##################### 
G <- 100
nn <- nrow(DT)
step_c <- nn/G

# collect the indexes in the list
list_inx <- vector('list', G)

ii_low <- 1
ii_up <- step_c
for (i2 in 1:G) {

    list_inx[[i2]] <- ii_low:ii_up 
    ii_low <- ii_up+1
    ii_up <- step_c*(i2+1)

    }
##################### 

ptm3 <- proc.time()

# approach implementation
#####################
list_out <- mclapply(1:G, function(i) {
            DT_tmp <- DT[list_inx[[i]],]
            return(DT_tmp[, g := c(0L, cumsum(shift(starts, -1L) > cummax(ends))[-.N]), IDs][,
                .(min(starts), max(ends)), .(g, IDs)][,g := NULL])
                },
                mc.cores = mc_cores
                )
#####################

exec_time3 <- proc.time() - ptm3
print("data.table + parallel::mclapply:")
print(exec_time3)

SESSION 信息

sessionInfo()
R version 3.5.2 (2018-12-20)
Platform: x86_64-apple-darwin15.6.0 (64-bit)
Running under: macOS Mojave 10.14.6

Matrix products: default
BLAS: /Library/Frameworks/R.framework/Versions/3.5/Resources/lib/libRblas.0.dylib
LAPACK: /Library/Frameworks/R.framework/Versions/3.5/Resources/lib/libRlapack.dylib

locale:
[1] en_AU.UTF-8/en_AU.UTF-8/en_AU.UTF-8/C/en_AU.UTF-8/en_AU.UTF-8

attached base packages:
[1] parallel  stats     graphics  grDevices utils     datasets  methods   base     

other attached packages:
[1] intervals_0.15.1    IntervalSurgeon_1.0 fst_0.9.0           data.table_1.12.2  

loaded via a namespace (and not attached):
[1] compiler_3.5.2 Rcpp_1.0.1