如何以 R "data.table" 方式记录非重叠范围(或时间间隔)?
How to record non-overlapping ranges (or time intervals) in R "data.table" way?
我的 objective 是为每个 ID 定义唯一的非重叠范围(或时间间隔),当每个 ID 有多个可能重叠的曝光范围时。我发现 R "IntervalSurgeon" 包中的 "flatten" 函数可以实现该任务。我的问题是:如何有效地执行相同的任务并以 "data.table" 的方式获得相同的 "tab_out" 输出?
library(data.table)
library(IntervalSurgeon)
set.seed(2019)
N <- 3 # number of IDs
IDs <- paste0("ID", 1:N) # unique IDs
K <- 4 # number of exposures per ID
DT <- data.table(IDs = rep(IDs, each = K),
starts = sample(1:20, N * K, replace = T))[,
ends := starts + sample(1:5, N * K, replace = T)]
DT <- DT[order(IDs, starts),]
tab_out <- DT[, as.list(data.table(
flatten(as.matrix(cbind(starts, ends))))),
by = IDs]
DT
IDs starts ends
1: ID1 7 11
2: ID1 13 17
3: ID1 15 16
4: ID1 16 18
5: ID2 1 5
6: ID2 1 4
7: ID2 2 3
8: ID2 17 19
9: ID3 3 6
10: ID3 13 16
11: ID3 14 15
12: ID3 16 21
tab_out
IDs V1 V2
1: ID1 7 11
2: ID1 13 18
3: ID2 1 5
4: ID2 17 19
5: ID3 3 6
6: ID3 13 21
这是一个使用 intervals
-package..
的解决方案
示例数据
library( data.table )
library( intervals )
DT <- fread("
IDs starts ends
ID1 7 11
ID1 13 17
ID1 15 16
ID1 16 18
ID2 1 5
ID2 1 4
ID2 2 3
ID2 17 19
ID3 3 6
ID3 13 16
ID3 14 15
ID3 16 21")
代码
myfun <- function( y ) {
data.table::as.data.table(
intervals::interval_union(
intervals::Intervals( as.matrix( y ) ), check_valid = TRUE )
)
}
DT[, myfun( .SD ), by = .(IDs)]
# IDs V1 V2
# 1: ID1 7 11
# 2: ID1 13 18
# 3: ID2 1 5
# 4: ID2 17 19
# 5: ID3 3 6
# 6: ID3 13 21
借鉴 David Aurenburg 的解决方案
DT[, g := c(0L, cumsum(shift(starts, -1L) > cummax(ends))[-.N]), IDs][,
.(min(starts), max(ends)), .(g, IDs)]
输出:
g IDs V1 V2
1: 0 ID1 7 11
2: 1 ID1 13 18
3: 0 ID2 1 5
4: 1 ID2 17 19
5: 0 ID3 3 6
6: 1 ID3 13 21
下面是 "IntervalSurgeon"、"intervals" 和 "data.table" 方法的计算时间。时间是针对包含一百万个 ID 的数据集,每个 ID 曝光 10 次,即 1000 万行。由于 "intervals" 方法花费的时间太长,我已经完成了一个 运行。
机器是 MacBook Pro(15 英寸,2018 年),配备 2.9 GHz Intel Core i9 处理器、32 GB 2400 MHz DDR4 内存,运行在 MacOC Mojave v. 10.14.6 上运行,最多 12线程。
"data.table" 方法是明显的赢家,与 "intervals" 方法是明显的输家一样:
计算时间
"IntervalSurgeon way:"
user system elapsed
469.296 6.528 473.200
"intervals way:"
user system elapsed
2463.840 8.137 2476.685
"data.table way:"
user system elapsed
22.125 0.133 21.249
有趣的是,none 方法受益于通过 setDTthreads() 设置大量 data.table 线程。将数据集拆分为 100 等份并通过 parallel::mclapply(mc.cores = 10)传递给 "data.table" 方法使计算时间低于 5 秒(下面也提供了代码)。
"data.table + parallel::mclapply" 组合方法也适用于更大的 50M ID 数据集(生成与 1M ID 数据集相同,但 MM = 50)。这个数据集有 500M 行,为 mclapply 分成 1000 个子集。我设置 mc.cores = 4 以免耗尽 RAM。单个 运行 计算时间为 451.485 秒,这对于笔记本电脑来说是个不错的结果!如果有一天将这种分析与 IntervalSurgeon 包中的其他类似类型的区间分析一起合并到 data.table 包中,那就太好了。
生成数据集
library(data.table)
library(fst)
rm(list = ls())
gc()
set.seed(2019)
# how many millions of IDs required?
MM <- 1
N <- 1000000 * MM
# unique IDs
IDs <- paste0("ID", 1:N)
# number of exposures per ID
K <- 10
ss <- sample(1:365, N * K, replace = T)
ff <- sample(1:365, N * K, replace = T)
ss_s <- pmin(ss, ff)
ff_s <- pmax(ss, ff)
DT <- data.table(IDs = rep(IDs, each = K),
starts = as.integer(ss_s),
ends = as.integer(ff_s + 1))
DT[order(IDs, starts, ends),]
write_fst(DT, path = paste0("fst_DT_", MM, "Mx3.fst"))
DT2
IDs starts ends
1: ID1 4 22
2: ID1 16 233
3: ID1 19 224
4: ID1 31 227
5: ID1 38 147
---
9999996: ID999999 152 310
9999997: ID999999 160 218
9999998: ID999999 180 272
9999999: ID999999 215 332
10000000: ID999999 260 265
三种方法的代码
library(data.table)
library(IntervalSurgeon)
library(intervals)
library(fst)
rm(list = ls())
gc()
threads_no <- 2L
setDTthreads(threads_no)
print(getDTthreads())
# read the dataset generated above
#####################
DT <- read_fst("fst_DT_1Mx3.fst", as.data.table = T)
print("dataset dimentions:")
print(dim(DT))
#####################
# "intervals" function
#####################
myfun <- function( y ) {
data.table::as.data.table(
intervals::interval_union(
intervals::Intervals( as.matrix( y ) ), check_valid = TRUE )
)
}
#####################
# 1) "IntervalSurgeon" way
#####################
ptm1 <- proc.time()
tab_out1 <- DT[, as.list(data.table(
flatten(as.matrix(cbind(starts, ends))))),
by = IDs]
exec_time1 <- proc.time() - ptm1
print("IntervalSurgeon way:")
print(exec_time1)
#####################
# 2) "intervals" way
#####################
ptm2 <- proc.time()
tab_out2 <- DT[, myfun( .SD ), by = .(IDs)][,
c("V1", "V2") := lapply(.SD, as.integer), .SDcols = c("V1", "V2")]
exec_time2 <- proc.time() - ptm2
print("intervals way:")
print(exec_time2)
#####################
# 3) "data.table" way
#####################
ptm3 <- proc.time()
tab_out3 <- DT[, g := c(0L, cumsum(shift(starts, -1L) > cummax(ends))[-.N]), IDs][,
.(min(starts), max(ends)), .(g, IDs)][,g := NULL]
exec_time3 <- proc.time() - ptm3
print("data.table way:")
print(exec_time3)
#####################
print(identical(tab_out1, tab_out2))
print(identical(tab_out2, tab_out3))
"data.table + parallel::mclapply" 组合方法
library(data.table)
library(IntervalSurgeon)
library(intervals)
library(fst)
library(parallel)
rm(list = ls())
gc()
mc_cores <- 10
threads_no <- 2L
setDTthreads(threads_no)
print(getDTthreads())
DT <- read_fst("fst_DT_1Mx3.fst", as.data.table = T)
# split the dataset into G equal parts
#####################
G <- 100
nn <- nrow(DT)
step_c <- nn/G
# collect the indexes in the list
list_inx <- vector('list', G)
ii_low <- 1
ii_up <- step_c
for (i2 in 1:G) {
list_inx[[i2]] <- ii_low:ii_up
ii_low <- ii_up+1
ii_up <- step_c*(i2+1)
}
#####################
ptm3 <- proc.time()
# approach implementation
#####################
list_out <- mclapply(1:G, function(i) {
DT_tmp <- DT[list_inx[[i]],]
return(DT_tmp[, g := c(0L, cumsum(shift(starts, -1L) > cummax(ends))[-.N]), IDs][,
.(min(starts), max(ends)), .(g, IDs)][,g := NULL])
},
mc.cores = mc_cores
)
#####################
exec_time3 <- proc.time() - ptm3
print("data.table + parallel::mclapply:")
print(exec_time3)
SESSION 信息
sessionInfo()
R version 3.5.2 (2018-12-20)
Platform: x86_64-apple-darwin15.6.0 (64-bit)
Running under: macOS Mojave 10.14.6
Matrix products: default
BLAS: /Library/Frameworks/R.framework/Versions/3.5/Resources/lib/libRblas.0.dylib
LAPACK: /Library/Frameworks/R.framework/Versions/3.5/Resources/lib/libRlapack.dylib
locale:
[1] en_AU.UTF-8/en_AU.UTF-8/en_AU.UTF-8/C/en_AU.UTF-8/en_AU.UTF-8
attached base packages:
[1] parallel stats graphics grDevices utils datasets methods base
other attached packages:
[1] intervals_0.15.1 IntervalSurgeon_1.0 fst_0.9.0 data.table_1.12.2
loaded via a namespace (and not attached):
[1] compiler_3.5.2 Rcpp_1.0.1
我的 objective 是为每个 ID 定义唯一的非重叠范围(或时间间隔),当每个 ID 有多个可能重叠的曝光范围时。我发现 R "IntervalSurgeon" 包中的 "flatten" 函数可以实现该任务。我的问题是:如何有效地执行相同的任务并以 "data.table" 的方式获得相同的 "tab_out" 输出?
library(data.table)
library(IntervalSurgeon)
set.seed(2019)
N <- 3 # number of IDs
IDs <- paste0("ID", 1:N) # unique IDs
K <- 4 # number of exposures per ID
DT <- data.table(IDs = rep(IDs, each = K),
starts = sample(1:20, N * K, replace = T))[,
ends := starts + sample(1:5, N * K, replace = T)]
DT <- DT[order(IDs, starts),]
tab_out <- DT[, as.list(data.table(
flatten(as.matrix(cbind(starts, ends))))),
by = IDs]
DT
IDs starts ends
1: ID1 7 11
2: ID1 13 17
3: ID1 15 16
4: ID1 16 18
5: ID2 1 5
6: ID2 1 4
7: ID2 2 3
8: ID2 17 19
9: ID3 3 6
10: ID3 13 16
11: ID3 14 15
12: ID3 16 21
tab_out
IDs V1 V2
1: ID1 7 11
2: ID1 13 18
3: ID2 1 5
4: ID2 17 19
5: ID3 3 6
6: ID3 13 21
这是一个使用 intervals
-package..
示例数据
library( data.table )
library( intervals )
DT <- fread("
IDs starts ends
ID1 7 11
ID1 13 17
ID1 15 16
ID1 16 18
ID2 1 5
ID2 1 4
ID2 2 3
ID2 17 19
ID3 3 6
ID3 13 16
ID3 14 15
ID3 16 21")
代码
myfun <- function( y ) {
data.table::as.data.table(
intervals::interval_union(
intervals::Intervals( as.matrix( y ) ), check_valid = TRUE )
)
}
DT[, myfun( .SD ), by = .(IDs)]
# IDs V1 V2
# 1: ID1 7 11
# 2: ID1 13 18
# 3: ID2 1 5
# 4: ID2 17 19
# 5: ID3 3 6
# 6: ID3 13 21
借鉴 David Aurenburg 的解决方案
DT[, g := c(0L, cumsum(shift(starts, -1L) > cummax(ends))[-.N]), IDs][,
.(min(starts), max(ends)), .(g, IDs)]
输出:
g IDs V1 V2
1: 0 ID1 7 11
2: 1 ID1 13 18
3: 0 ID2 1 5
4: 1 ID2 17 19
5: 0 ID3 3 6
6: 1 ID3 13 21
下面是 "IntervalSurgeon"、"intervals" 和 "data.table" 方法的计算时间。时间是针对包含一百万个 ID 的数据集,每个 ID 曝光 10 次,即 1000 万行。由于 "intervals" 方法花费的时间太长,我已经完成了一个 运行。
机器是 MacBook Pro(15 英寸,2018 年),配备 2.9 GHz Intel Core i9 处理器、32 GB 2400 MHz DDR4 内存,运行在 MacOC Mojave v. 10.14.6 上运行,最多 12线程。
"data.table" 方法是明显的赢家,与 "intervals" 方法是明显的输家一样:
计算时间
"IntervalSurgeon way:"
user system elapsed
469.296 6.528 473.200
"intervals way:"
user system elapsed
2463.840 8.137 2476.685
"data.table way:"
user system elapsed
22.125 0.133 21.249
有趣的是,none 方法受益于通过 setDTthreads() 设置大量 data.table 线程。将数据集拆分为 100 等份并通过 parallel::mclapply(mc.cores = 10)传递给 "data.table" 方法使计算时间低于 5 秒(下面也提供了代码)。
"data.table + parallel::mclapply" 组合方法也适用于更大的 50M ID 数据集(生成与 1M ID 数据集相同,但 MM = 50)。这个数据集有 500M 行,为 mclapply 分成 1000 个子集。我设置 mc.cores = 4 以免耗尽 RAM。单个 运行 计算时间为 451.485 秒,这对于笔记本电脑来说是个不错的结果!如果有一天将这种分析与 IntervalSurgeon 包中的其他类似类型的区间分析一起合并到 data.table 包中,那就太好了。
生成数据集
library(data.table)
library(fst)
rm(list = ls())
gc()
set.seed(2019)
# how many millions of IDs required?
MM <- 1
N <- 1000000 * MM
# unique IDs
IDs <- paste0("ID", 1:N)
# number of exposures per ID
K <- 10
ss <- sample(1:365, N * K, replace = T)
ff <- sample(1:365, N * K, replace = T)
ss_s <- pmin(ss, ff)
ff_s <- pmax(ss, ff)
DT <- data.table(IDs = rep(IDs, each = K),
starts = as.integer(ss_s),
ends = as.integer(ff_s + 1))
DT[order(IDs, starts, ends),]
write_fst(DT, path = paste0("fst_DT_", MM, "Mx3.fst"))
DT2
IDs starts ends
1: ID1 4 22
2: ID1 16 233
3: ID1 19 224
4: ID1 31 227
5: ID1 38 147
---
9999996: ID999999 152 310
9999997: ID999999 160 218
9999998: ID999999 180 272
9999999: ID999999 215 332
10000000: ID999999 260 265
三种方法的代码
library(data.table)
library(IntervalSurgeon)
library(intervals)
library(fst)
rm(list = ls())
gc()
threads_no <- 2L
setDTthreads(threads_no)
print(getDTthreads())
# read the dataset generated above
#####################
DT <- read_fst("fst_DT_1Mx3.fst", as.data.table = T)
print("dataset dimentions:")
print(dim(DT))
#####################
# "intervals" function
#####################
myfun <- function( y ) {
data.table::as.data.table(
intervals::interval_union(
intervals::Intervals( as.matrix( y ) ), check_valid = TRUE )
)
}
#####################
# 1) "IntervalSurgeon" way
#####################
ptm1 <- proc.time()
tab_out1 <- DT[, as.list(data.table(
flatten(as.matrix(cbind(starts, ends))))),
by = IDs]
exec_time1 <- proc.time() - ptm1
print("IntervalSurgeon way:")
print(exec_time1)
#####################
# 2) "intervals" way
#####################
ptm2 <- proc.time()
tab_out2 <- DT[, myfun( .SD ), by = .(IDs)][,
c("V1", "V2") := lapply(.SD, as.integer), .SDcols = c("V1", "V2")]
exec_time2 <- proc.time() - ptm2
print("intervals way:")
print(exec_time2)
#####################
# 3) "data.table" way
#####################
ptm3 <- proc.time()
tab_out3 <- DT[, g := c(0L, cumsum(shift(starts, -1L) > cummax(ends))[-.N]), IDs][,
.(min(starts), max(ends)), .(g, IDs)][,g := NULL]
exec_time3 <- proc.time() - ptm3
print("data.table way:")
print(exec_time3)
#####################
print(identical(tab_out1, tab_out2))
print(identical(tab_out2, tab_out3))
"data.table + parallel::mclapply" 组合方法
library(data.table)
library(IntervalSurgeon)
library(intervals)
library(fst)
library(parallel)
rm(list = ls())
gc()
mc_cores <- 10
threads_no <- 2L
setDTthreads(threads_no)
print(getDTthreads())
DT <- read_fst("fst_DT_1Mx3.fst", as.data.table = T)
# split the dataset into G equal parts
#####################
G <- 100
nn <- nrow(DT)
step_c <- nn/G
# collect the indexes in the list
list_inx <- vector('list', G)
ii_low <- 1
ii_up <- step_c
for (i2 in 1:G) {
list_inx[[i2]] <- ii_low:ii_up
ii_low <- ii_up+1
ii_up <- step_c*(i2+1)
}
#####################
ptm3 <- proc.time()
# approach implementation
#####################
list_out <- mclapply(1:G, function(i) {
DT_tmp <- DT[list_inx[[i]],]
return(DT_tmp[, g := c(0L, cumsum(shift(starts, -1L) > cummax(ends))[-.N]), IDs][,
.(min(starts), max(ends)), .(g, IDs)][,g := NULL])
},
mc.cores = mc_cores
)
#####################
exec_time3 <- proc.time() - ptm3
print("data.table + parallel::mclapply:")
print(exec_time3)
SESSION 信息
sessionInfo()
R version 3.5.2 (2018-12-20)
Platform: x86_64-apple-darwin15.6.0 (64-bit)
Running under: macOS Mojave 10.14.6
Matrix products: default
BLAS: /Library/Frameworks/R.framework/Versions/3.5/Resources/lib/libRblas.0.dylib
LAPACK: /Library/Frameworks/R.framework/Versions/3.5/Resources/lib/libRlapack.dylib
locale:
[1] en_AU.UTF-8/en_AU.UTF-8/en_AU.UTF-8/C/en_AU.UTF-8/en_AU.UTF-8
attached base packages:
[1] parallel stats graphics grDevices utils datasets methods base
other attached packages:
[1] intervals_0.15.1 IntervalSurgeon_1.0 fst_0.9.0 data.table_1.12.2
loaded via a namespace (and not attached):
[1] compiler_3.5.2 Rcpp_1.0.1