为每行查找最接近的匹配项并根据条件求和
Find nearest matches for each row and sum based on a condition
考虑以下 data.table 事件:
library(data.table)
breaks <- data.table(id = 1:8,
Channel = c("NP1", "NP1", "NP2", "NP2", "NP3", "NP3", "AT4", "AT4"),
Time = c(1000, 1100, 975, 1075, 1010, 1080, 1000, 1050),
Day = c(1, 1, 1, 1, 1, 1, 1, 1),
ZA = c(15, 12, 4, 2, 1, 2, 23, 18),
stringsAsFactors = F)
breaks
id Channel Time Day ZA
1: 1 NP1 1000 1 15
2: 2 NP1 1100 1 12
3: 3 NP2 975 1 4
4: 4 NP2 1075 1 2
5: 5 NP3 1010 1 1
6: 6 NP3 1080 1 2
7: 7 AT4 1000 1 23
8: 8 AT4 1050 1 18
对于休息中的每个独特事件,我想使用 Time
变量在所有其他渠道中找到最近的事件,其中 Day == Day
然后对这些事件的 ZA 值求和。
这是我想要达到的结果:
id Channel Time Day ZA Sum
1: 1 NP1 1000 1 15 28
2: 2 NP1 1100 1 12 22
3: 3 NP2 975 1 4 39
4: 4 NP2 1075 1 2 32
5: 5 NP3 1010 1 1 42
6: 6 NP3 1080 1 2 32
7: 7 AT4 1000 1 23 20
8: 8 AT4 1050 1 18 19
所以第一行的通道是 NP1。 Time = 1000
的所有其他通道中的关闭事件是第 3、5 和 7 行。4+1+23 = 28
我使用 data.table 和以下代码使它工作:
breaks[breaks[, c("Day", "Time", "Channel", "ZA")], on = "Day", allow.cartesian = TRUE][
Channel != i.Channel][
order(id)][
, delta := abs(Time - i.Time)][
, .SD[delta == min(delta)], by = .(Channel, Time, Day, i.Channel)][
, unique(.SD, by = c("id", "i.Channel"))][
, .(Sum = sum(i.ZA)), by = .(id, Channel, Time, Day, ZA)]
但是,这在第一步中创建了一个包含 64 行的数据集,我想使用超过一百万行的数据集来执行此操作。
谁能帮我找到更有效的方法?
编辑:
我在具有 39 个不同通道的 140 万行的完整数据集上尝试了一次 G. Grothendieck (sqldf)、eddi (data.table) 和 MarkusN (dplyr) 的解决方案。数据集在内存中。
sqldf: 54 minutes
data.table: 11 hours
dplyr: 29 hours
这是一个使用 dplyr 和自连接的解决方案:
library(dplyr)
breaks %>%
inner_join(breaks, by=c("Day"), suffix=c("",".y")) %>% # self-join
filter(Channel != Channel.y) %>% # ignore events of same channel
group_by(id, Channel, Time, Day, ZA, Channel.y) %>% # build group for every event
arrange(abs(Time - Time.y)) %>% # sort by minimal time-diff
filter(row_number()==1) %>% # keep just row with minimal time-diff
group_by(id, Channel, Time, Day, ZA) %>% # group by all columns of original event
summarise(Sum=sum(ZA.y)) %>% # sum ZA of other channels
ungroup() %>%
select(id:Sum)
也许我必须更具体地回答我的问题。与 data.table 不同,dplyr 能够将代码翻译成 sql。因此,如果您的数据存储在数据库中,您可以直接连接到包含您的数据的 table。所有(大部分)dpylr 代码都在您的 DBMS 中进行评估。由于执行连接是每个 DBMS 的关键任务,因此您不必担心性能。
但是,如果您的数据被导入到 R 中并且您担心 RAM 限制,您必须遍历数据帧的每一行。这也可以用 dplyr 来完成:
library(dplyr)
breaks %>%
rowwise() %>%
do({
row = as_data_frame(.)
df =
breaks %>%
filter(Day == row$Day & Channel != row$Channel) %>%
mutate(time_diff = abs(Time-row$Time)) %>%
group_by(Channel) %>%
arrange(abs(Time-row$Time), .by_group=TRUE) %>%
filter(row_number()==1) %>%
ungroup() %>% summarise(sum(ZA))
row %>% mutate(sumZA = df[[1]])
})
在内部 select 中,将每一行在同一天和不同频道的中断中自连接到这些行,然后在所有连接到特定原始行的行中,仅保留具有最小值的连接行绝对时差。在外部 select 中,对来自 id 内另一个通道的 ZA 求和,给出结果。
请注意,我们在这里假设默认的 SQLite 后端为 sqldf,并使用特定于该数据库的功能,即如果 min
在 select 中使用,则指定其他值因为 select 也将从最小化行填充。
默认情况下,它会使用内存数据库,如果合适的话最好,但是如果你指定 dbname = tempfile()
作为 sqldf
的参数,它会使用一个文件作为 out-of -内存数据库代替。还可以添加一个或多个索引,这些索引可能会或可能不会加快速度。有关更多示例,请参阅 sqldf github 主页。
library(sqldf)
sqldf("select id, Channel, Time, Day, ZA, sum(bZA) Sum
from (
select a.*, b.ZA bZA, min(abs(a.Time - b.Time))
from breaks a join breaks b on a.Day = b.Day and a.Channel != b.Channel
group by a.id, b.Channel)
group by id")
给予:
id Channel Time Day ZA Sum
1 1 NP1 1000 1 15 28
2 2 NP1 1100 1 12 22
3 3 NP2 975 1 4 39
4 4 NP2 1075 1 2 32
5 5 NP3 1010 1 1 42
6 6 NP3 1080 1 2 32
7 7 AT4 1000 1 23 20
8 8 AT4 1050 1 18 19
这比问题中关于这种规模问题的 data.table 代码稍快,但对于更大的问题,必须重做比较。
此外,由于不必具体化中间结果(取决于查询优化器)和处理内存不足(如果需要)的可能性,它可能能够处理更大的大小。
library(data.table)
library(dplyr)
library(sqldf)
library(rbenchmark)
benchmark(sqldf =
sqldf("select id, Channel, Time, Day, ZA, sum(bZA) Sum
from (
select a.*, b.ZA bZA, min(abs(a.Time - b.Time))
from breaks a join breaks b on a.Day = b.Day and a.Channel != b.Channel
group by a.id, b.Channel)
group by id"),
data.table = breaks[breaks[, c("Day", "Time", "Channel", "ZA")], on = "Day",
allow.cartesian = TRUE][
Channel != i.Channel][
order(id)][
, delta := abs(Time - i.Time)][
, .SD[delta == min(delta)], by = .(Channel, Time, Day, i.Channel)][
, unique(.SD, by = c("id", "i.Channel"))][
, .(Sum = sum(i.ZA)), by = .(id, Channel, Time, Day, ZA)],
dplyr = { breaks %>%
inner_join(breaks, by=c("Day"), suffix=c("",".y")) %>%
filter(Channel != Channel.y) %>%
group_by(id, Channel, Time, Day, ZA, Channel.y) %>%
arrange(abs(Time - Time.y)) %>%
filter(row_number()==1) %>%
group_by(id, Channel, Time, Day, ZA) %>%
summarise(Sum=sum(ZA.y)) %>%
ungroup() %>%
select(id:Sum) },
order = "elapsed")[1:4]
给予:
test replications elapsed relative
1 sqldf 100 3.38 1.000
2 data.table 100 4.05 1.198
3 dplyr 100 9.23 2.731
我不确定它的速度(可能很慢),但它在内存方面会非常保守:
Channels = breaks[, unique(Channel)]
breaks[, Sum := breaks[breaks[row,
.(Day, Channel = setdiff(Channels, Channel), Time)],
on = .(Day, Channel, Time), roll = 'nearest',
sum(ZA)]
, by = .(row = 1:nrow(breaks))]
它可能会帮助速度达到 setkey(breaks, Day, Channel, Time)
而不是使用 on
。
遇到这个并在 OP 编辑中看到了时间安排。因此,提出一种可能的 Rcpp
方法:
library(Rcpp)
#library(inline)
nearsum <- cppFunction('
NumericVector closestSum(NumericVector cid, NumericVector Time, NumericVector ZA) {
int d, mintime, mintimeZA, prevChannel = 0, nextChannel = 0;
int sz = cid.size();
NumericVector sumvec(sz);
for (int r = 0; r < sz; r++) {
sumvec[r] = 0;
mintime = 10000;
//Rcpp::Rcout << "Beginning row = " << r << std::endl;
for (int i = 0; i < sz; i++) {
if (cid[r] != cid[i]) {
//Rcpp::Rcout << "Current idx = " << i << std::endl;
//handle boundary conditions
if (i == 0) {
prevChannel = 0;
} else {
prevChannel = cid[i-1];
}
if (i == sz - 1) {
nextChannel = 0;
} else {
nextChannel = cid[i+1];
}
//calculate time difference
d = abs(Time[i] - Time[r]);
if (cid[i] != prevChannel) {
///this is a new channel
mintime = d;
mintimeZA = ZA[i];
} else {
if (d < mintime) {
//this is a new min in time diff
mintime = d;
mintimeZA = ZA[i];
}
}
//Rcpp::Rcout << "Time difference = " << d << std::endl;
//Rcpp::Rcout << "ZA for current min time gap = " << mintimeZA << std::endl;
if (cid[i] != nextChannel) {
//this is the last data point for this channel
mintime = 10000;
sumvec[r] += mintimeZA;
//Rcpp::Rcout << "Final sum for current row = " << sumvec[r] << std::endl;
}
}
}
}
return sumvec;
}
')
调用cpp函数:
library(data.table)
setorder(breaks, id, Channel, Day, Time)
breaks[, ChannelID := .GRP, by=Channel]
breaks[, Sum := nearsum(ChannelID, Time, ZA), by=.(Day)]
输出:
id Channel Time Day ZA ChannelID Sum
1: 1 NP1 1000 1 15 1 28
2: 2 NP1 1100 1 12 1 22
3: 3 NP2 975 1 4 2 39
4: 4 NP2 1075 1 2 2 32
5: 5 NP3 1010 1 1 3 42
6: 6 NP3 1080 1 2 3 32
7: 7 AT4 1000 1 23 4 20
8: 8 AT4 1050 1 18 4 19
时间码:
#create a larger dataset
largeBreaks <- rbindlist(lapply(1:1e5, function(n) copy(breaks)[, Day := n]))
setorder(largeBreaks, Day, Channel, Time)
largeBreaks[, id := .I]
library(sqldf)
mtd0 <- function() {
sqldf("select id, Channel, Time, Day, ZA, sum(bZA) Sum
from (
select a.*, b.ZA bZA, min(abs(a.Time - b.Time))
from largeBreaks a join largeBreaks b on a.Day = b.Day and a.Channel != b.Channel
group by a.id, b.Channel)
group by id")
}
mtd1 <- function() {
setorder(largeBreaks, Day, Channel, Time)
largeBreaks[, ChannelID := .GRP, by=Channel]
largeBreaks[, Sum := nearsum(ChannelID, Time, ZA), by=.(Day)]
}
library(microbenchmark)
microbenchmark(mtd0(), mtd1(), times=3L)
时间[需要添加大约 5 秒(至少在我的机器上)来编译 cpp 函数]:
Unit: milliseconds
expr min lq mean median uq max neval
mtd0() 10449.6696 10505.7669 10661.7734 10561.864 10767.8252 10973.7863 3
mtd1() 365.4157 371.2594 386.6866 377.103 397.3221 417.5412 3
考虑以下 data.table 事件:
library(data.table)
breaks <- data.table(id = 1:8,
Channel = c("NP1", "NP1", "NP2", "NP2", "NP3", "NP3", "AT4", "AT4"),
Time = c(1000, 1100, 975, 1075, 1010, 1080, 1000, 1050),
Day = c(1, 1, 1, 1, 1, 1, 1, 1),
ZA = c(15, 12, 4, 2, 1, 2, 23, 18),
stringsAsFactors = F)
breaks
id Channel Time Day ZA
1: 1 NP1 1000 1 15
2: 2 NP1 1100 1 12
3: 3 NP2 975 1 4
4: 4 NP2 1075 1 2
5: 5 NP3 1010 1 1
6: 6 NP3 1080 1 2
7: 7 AT4 1000 1 23
8: 8 AT4 1050 1 18
对于休息中的每个独特事件,我想使用 Time
变量在所有其他渠道中找到最近的事件,其中 Day == Day
然后对这些事件的 ZA 值求和。
这是我想要达到的结果:
id Channel Time Day ZA Sum
1: 1 NP1 1000 1 15 28
2: 2 NP1 1100 1 12 22
3: 3 NP2 975 1 4 39
4: 4 NP2 1075 1 2 32
5: 5 NP3 1010 1 1 42
6: 6 NP3 1080 1 2 32
7: 7 AT4 1000 1 23 20
8: 8 AT4 1050 1 18 19
所以第一行的通道是 NP1。 Time = 1000
的所有其他通道中的关闭事件是第 3、5 和 7 行。4+1+23 = 28
我使用 data.table 和以下代码使它工作:
breaks[breaks[, c("Day", "Time", "Channel", "ZA")], on = "Day", allow.cartesian = TRUE][
Channel != i.Channel][
order(id)][
, delta := abs(Time - i.Time)][
, .SD[delta == min(delta)], by = .(Channel, Time, Day, i.Channel)][
, unique(.SD, by = c("id", "i.Channel"))][
, .(Sum = sum(i.ZA)), by = .(id, Channel, Time, Day, ZA)]
但是,这在第一步中创建了一个包含 64 行的数据集,我想使用超过一百万行的数据集来执行此操作。
谁能帮我找到更有效的方法?
编辑:
我在具有 39 个不同通道的 140 万行的完整数据集上尝试了一次 G. Grothendieck (sqldf)、eddi (data.table) 和 MarkusN (dplyr) 的解决方案。数据集在内存中。
sqldf: 54 minutes
data.table: 11 hours
dplyr: 29 hours
这是一个使用 dplyr 和自连接的解决方案:
library(dplyr)
breaks %>%
inner_join(breaks, by=c("Day"), suffix=c("",".y")) %>% # self-join
filter(Channel != Channel.y) %>% # ignore events of same channel
group_by(id, Channel, Time, Day, ZA, Channel.y) %>% # build group for every event
arrange(abs(Time - Time.y)) %>% # sort by minimal time-diff
filter(row_number()==1) %>% # keep just row with minimal time-diff
group_by(id, Channel, Time, Day, ZA) %>% # group by all columns of original event
summarise(Sum=sum(ZA.y)) %>% # sum ZA of other channels
ungroup() %>%
select(id:Sum)
也许我必须更具体地回答我的问题。与 data.table 不同,dplyr 能够将代码翻译成 sql。因此,如果您的数据存储在数据库中,您可以直接连接到包含您的数据的 table。所有(大部分)dpylr 代码都在您的 DBMS 中进行评估。由于执行连接是每个 DBMS 的关键任务,因此您不必担心性能。
但是,如果您的数据被导入到 R 中并且您担心 RAM 限制,您必须遍历数据帧的每一行。这也可以用 dplyr 来完成:
library(dplyr)
breaks %>%
rowwise() %>%
do({
row = as_data_frame(.)
df =
breaks %>%
filter(Day == row$Day & Channel != row$Channel) %>%
mutate(time_diff = abs(Time-row$Time)) %>%
group_by(Channel) %>%
arrange(abs(Time-row$Time), .by_group=TRUE) %>%
filter(row_number()==1) %>%
ungroup() %>% summarise(sum(ZA))
row %>% mutate(sumZA = df[[1]])
})
在内部 select 中,将每一行在同一天和不同频道的中断中自连接到这些行,然后在所有连接到特定原始行的行中,仅保留具有最小值的连接行绝对时差。在外部 select 中,对来自 id 内另一个通道的 ZA 求和,给出结果。
请注意,我们在这里假设默认的 SQLite 后端为 sqldf,并使用特定于该数据库的功能,即如果 min
在 select 中使用,则指定其他值因为 select 也将从最小化行填充。
默认情况下,它会使用内存数据库,如果合适的话最好,但是如果你指定 dbname = tempfile()
作为 sqldf
的参数,它会使用一个文件作为 out-of -内存数据库代替。还可以添加一个或多个索引,这些索引可能会或可能不会加快速度。有关更多示例,请参阅 sqldf github 主页。
library(sqldf)
sqldf("select id, Channel, Time, Day, ZA, sum(bZA) Sum
from (
select a.*, b.ZA bZA, min(abs(a.Time - b.Time))
from breaks a join breaks b on a.Day = b.Day and a.Channel != b.Channel
group by a.id, b.Channel)
group by id")
给予:
id Channel Time Day ZA Sum
1 1 NP1 1000 1 15 28
2 2 NP1 1100 1 12 22
3 3 NP2 975 1 4 39
4 4 NP2 1075 1 2 32
5 5 NP3 1010 1 1 42
6 6 NP3 1080 1 2 32
7 7 AT4 1000 1 23 20
8 8 AT4 1050 1 18 19
这比问题中关于这种规模问题的 data.table 代码稍快,但对于更大的问题,必须重做比较。
此外,由于不必具体化中间结果(取决于查询优化器)和处理内存不足(如果需要)的可能性,它可能能够处理更大的大小。
library(data.table)
library(dplyr)
library(sqldf)
library(rbenchmark)
benchmark(sqldf =
sqldf("select id, Channel, Time, Day, ZA, sum(bZA) Sum
from (
select a.*, b.ZA bZA, min(abs(a.Time - b.Time))
from breaks a join breaks b on a.Day = b.Day and a.Channel != b.Channel
group by a.id, b.Channel)
group by id"),
data.table = breaks[breaks[, c("Day", "Time", "Channel", "ZA")], on = "Day",
allow.cartesian = TRUE][
Channel != i.Channel][
order(id)][
, delta := abs(Time - i.Time)][
, .SD[delta == min(delta)], by = .(Channel, Time, Day, i.Channel)][
, unique(.SD, by = c("id", "i.Channel"))][
, .(Sum = sum(i.ZA)), by = .(id, Channel, Time, Day, ZA)],
dplyr = { breaks %>%
inner_join(breaks, by=c("Day"), suffix=c("",".y")) %>%
filter(Channel != Channel.y) %>%
group_by(id, Channel, Time, Day, ZA, Channel.y) %>%
arrange(abs(Time - Time.y)) %>%
filter(row_number()==1) %>%
group_by(id, Channel, Time, Day, ZA) %>%
summarise(Sum=sum(ZA.y)) %>%
ungroup() %>%
select(id:Sum) },
order = "elapsed")[1:4]
给予:
test replications elapsed relative
1 sqldf 100 3.38 1.000
2 data.table 100 4.05 1.198
3 dplyr 100 9.23 2.731
我不确定它的速度(可能很慢),但它在内存方面会非常保守:
Channels = breaks[, unique(Channel)]
breaks[, Sum := breaks[breaks[row,
.(Day, Channel = setdiff(Channels, Channel), Time)],
on = .(Day, Channel, Time), roll = 'nearest',
sum(ZA)]
, by = .(row = 1:nrow(breaks))]
它可能会帮助速度达到 setkey(breaks, Day, Channel, Time)
而不是使用 on
。
遇到这个并在 OP 编辑中看到了时间安排。因此,提出一种可能的 Rcpp
方法:
library(Rcpp)
#library(inline)
nearsum <- cppFunction('
NumericVector closestSum(NumericVector cid, NumericVector Time, NumericVector ZA) {
int d, mintime, mintimeZA, prevChannel = 0, nextChannel = 0;
int sz = cid.size();
NumericVector sumvec(sz);
for (int r = 0; r < sz; r++) {
sumvec[r] = 0;
mintime = 10000;
//Rcpp::Rcout << "Beginning row = " << r << std::endl;
for (int i = 0; i < sz; i++) {
if (cid[r] != cid[i]) {
//Rcpp::Rcout << "Current idx = " << i << std::endl;
//handle boundary conditions
if (i == 0) {
prevChannel = 0;
} else {
prevChannel = cid[i-1];
}
if (i == sz - 1) {
nextChannel = 0;
} else {
nextChannel = cid[i+1];
}
//calculate time difference
d = abs(Time[i] - Time[r]);
if (cid[i] != prevChannel) {
///this is a new channel
mintime = d;
mintimeZA = ZA[i];
} else {
if (d < mintime) {
//this is a new min in time diff
mintime = d;
mintimeZA = ZA[i];
}
}
//Rcpp::Rcout << "Time difference = " << d << std::endl;
//Rcpp::Rcout << "ZA for current min time gap = " << mintimeZA << std::endl;
if (cid[i] != nextChannel) {
//this is the last data point for this channel
mintime = 10000;
sumvec[r] += mintimeZA;
//Rcpp::Rcout << "Final sum for current row = " << sumvec[r] << std::endl;
}
}
}
}
return sumvec;
}
')
调用cpp函数:
library(data.table)
setorder(breaks, id, Channel, Day, Time)
breaks[, ChannelID := .GRP, by=Channel]
breaks[, Sum := nearsum(ChannelID, Time, ZA), by=.(Day)]
输出:
id Channel Time Day ZA ChannelID Sum
1: 1 NP1 1000 1 15 1 28
2: 2 NP1 1100 1 12 1 22
3: 3 NP2 975 1 4 2 39
4: 4 NP2 1075 1 2 2 32
5: 5 NP3 1010 1 1 3 42
6: 6 NP3 1080 1 2 3 32
7: 7 AT4 1000 1 23 4 20
8: 8 AT4 1050 1 18 4 19
时间码:
#create a larger dataset
largeBreaks <- rbindlist(lapply(1:1e5, function(n) copy(breaks)[, Day := n]))
setorder(largeBreaks, Day, Channel, Time)
largeBreaks[, id := .I]
library(sqldf)
mtd0 <- function() {
sqldf("select id, Channel, Time, Day, ZA, sum(bZA) Sum
from (
select a.*, b.ZA bZA, min(abs(a.Time - b.Time))
from largeBreaks a join largeBreaks b on a.Day = b.Day and a.Channel != b.Channel
group by a.id, b.Channel)
group by id")
}
mtd1 <- function() {
setorder(largeBreaks, Day, Channel, Time)
largeBreaks[, ChannelID := .GRP, by=Channel]
largeBreaks[, Sum := nearsum(ChannelID, Time, ZA), by=.(Day)]
}
library(microbenchmark)
microbenchmark(mtd0(), mtd1(), times=3L)
时间[需要添加大约 5 秒(至少在我的机器上)来编译 cpp 函数]:
Unit: milliseconds
expr min lq mean median uq max neval
mtd0() 10449.6696 10505.7669 10661.7734 10561.864 10767.8252 10973.7863 3
mtd1() 365.4157 371.2594 386.6866 377.103 397.3221 417.5412 3