为每行查找最接近的匹配项并根据条件求和

Find nearest matches for each row and sum based on a condition

考虑以下 data.table 事件:

library(data.table)
breaks <- data.table(id = 1:8,
                     Channel = c("NP1", "NP1", "NP2", "NP2", "NP3", "NP3", "AT4", "AT4"),
                     Time = c(1000, 1100, 975, 1075, 1010, 1080, 1000, 1050),
                     Day = c(1, 1, 1, 1, 1, 1, 1, 1),
                     ZA = c(15, 12, 4, 2, 1, 2, 23, 18),
                     stringsAsFactors = F)

breaks
   id Channel Time Day ZA
1:  1     NP1 1000   1 15
2:  2     NP1 1100   1 12
3:  3     NP2  975   1  4
4:  4     NP2 1075   1  2
5:  5     NP3 1010   1  1
6:  6     NP3 1080   1  2
7:  7     AT4 1000   1 23
8:  8     AT4 1050   1 18

对于休息中的每个独特事件,我想使用 Time 变量在所有其他渠道中找到最近的事件,其中 Day == Day 然后对这些事件的 ZA 值求和。

这是我想要达到的结果:

   id Channel Time Day ZA Sum
1:  1     NP1 1000   1 15  28
2:  2     NP1 1100   1 12  22
3:  3     NP2  975   1  4  39
4:  4     NP2 1075   1  2  32
5:  5     NP3 1010   1  1  42
6:  6     NP3 1080   1  2  32
7:  7     AT4 1000   1 23  20
8:  8     AT4 1050   1 18  19

所以第一行的通道是 NP1。 Time = 1000 的所有其他通道中的关闭事件是第 3、5 和 7 行。4+1+23 = 28

我使用 data.table 和以下代码使它工作:

breaks[breaks[, c("Day", "Time", "Channel", "ZA")], on = "Day", allow.cartesian = TRUE][
  Channel != i.Channel][
    order(id)][
      , delta := abs(Time - i.Time)][
        , .SD[delta == min(delta)], by = .(Channel, Time, Day, i.Channel)][
          , unique(.SD, by = c("id", "i.Channel"))][
            , .(Sum = sum(i.ZA)), by = .(id, Channel, Time, Day, ZA)]

但是,这在第一步中创建了一个包含 64 行的数据集,我想使用超过一百万行的数据集来执行此操作。

谁能帮我找到更有效的方法?

编辑:

我在具有 39 个不同通道的 140 万行的完整数据集上尝试了一次 G. Grothendieck (sqldf)、eddi (data.table) 和 MarkusN (dplyr) 的解决方案。数据集在内存中。

sqldf:      54 minutes
data.table: 11 hours
dplyr:      29 hours

这是一个使用 dplyr 和自连接的解决方案:

library(dplyr)
breaks %>% 
  inner_join(breaks, by=c("Day"), suffix=c("",".y")) %>%  # self-join
  filter(Channel != Channel.y) %>%                        # ignore events of same channel
  group_by(id, Channel, Time, Day, ZA, Channel.y) %>%     # build group for every event
  arrange(abs(Time - Time.y)) %>%                         # sort by minimal time-diff
  filter(row_number()==1) %>%                             # keep just row with minimal time-diff
  group_by(id, Channel, Time, Day, ZA) %>%                # group by all columns of original event
  summarise(Sum=sum(ZA.y)) %>%                            # sum ZA of other channels
  ungroup() %>% 
  select(id:Sum)

也许我必须更具体地回答我的问题。与 data.table 不同,dplyr 能够将代码翻译成 sql。因此,如果您的数据存储在数据库中,您可以直接连接到包含您的数据的 table。所有(大部分)dpylr 代码都在您的 DBMS 中进行评估。由于执行连接是每个 DBMS 的关键任务,因此您不必担心性能。

但是,如果您的数据被导入到 R 中并且您担心 RAM 限制,您必须遍历数据帧的每一行。这也可以用 dplyr 来完成:

library(dplyr)
breaks %>% 
rowwise() %>% 
do({
  row = as_data_frame(.)
  df =
    breaks %>%
    filter(Day == row$Day & Channel != row$Channel) %>% 
    mutate(time_diff = abs(Time-row$Time)) %>% 
    group_by(Channel) %>% 
    arrange(abs(Time-row$Time), .by_group=TRUE) %>% 
    filter(row_number()==1) %>% 
    ungroup() %>% summarise(sum(ZA))

  row %>% mutate(sumZA = df[[1]])
})

在内部 select 中,将每一行在同一天和不同频道的中断中自连接到这些行,然后在所有连接到特定原始行的行中,仅保留具有最小值的连接行绝对时差。在外部 select 中,对来自 id 内另一个通道的 ZA 求和,给出结果。

请注意,我们在这里假设默认的 SQLite 后端为 sqldf,并使用特定于该数据库的功能,即如果 min 在 select 中使用,则指定其他值因为 select 也将从最小化行填充。

默认情况下,它会使用内存数据库,如果合适的话最好,但是如果你指定 dbname = tempfile() 作为 sqldf 的参数,它会使用一个文件作为 out-of -内存数据库代替。还可以添加一个或多个索引,这些索引可能会或可能不会加快速度。有关更多示例,请参阅 sqldf github 主页。

library(sqldf)

sqldf("select id, Channel, Time, Day, ZA, sum(bZA) Sum
 from (
   select a.*, b.ZA bZA, min(abs(a.Time - b.Time))
   from breaks a join breaks b on a.Day = b.Day and a.Channel != b.Channel
   group by a.id, b.Channel)
 group by id")

给予:

  id Channel Time Day ZA Sum
1  1     NP1 1000   1 15  28
2  2     NP1 1100   1 12  22
3  3     NP2  975   1  4  39
4  4     NP2 1075   1  2  32
5  5     NP3 1010   1  1  42
6  6     NP3 1080   1  2  32
7  7     AT4 1000   1 23  20
8  8     AT4 1050   1 18  19

这比问题中关于这种规模问题的 data.table 代码稍快,但对于更大的问题,必须重做比较。

此外,由于不必具体化中间结果(取决于查询优化器)和处理内存不足(如果需要)的可能性,它可能能够处理更大的大小。

library(data.table)
library(dplyr)
library(sqldf)
library(rbenchmark)

benchmark(sqldf = 
sqldf("select id, Channel, Time, Day, ZA, sum(bZA) Sum
 from (
   select a.*, b.ZA bZA, min(abs(a.Time - b.Time))
   from breaks a join breaks b on a.Day = b.Day and a.Channel != b.Channel
   group by a.id, b.Channel)
 group by id"),

data.table = breaks[breaks[, c("Day", "Time", "Channel", "ZA")], on = "Day",
     allow.cartesian = TRUE][
  Channel != i.Channel][
    order(id)][
      , delta := abs(Time - i.Time)][
        , .SD[delta == min(delta)], by = .(Channel, Time, Day, i.Channel)][
          , unique(.SD, by = c("id", "i.Channel"))][
            , .(Sum = sum(i.ZA)), by = .(id, Channel, Time, Day, ZA)],

dplyr = { breaks %>% 
  inner_join(breaks, by=c("Day"), suffix=c("",".y")) %>%
  filter(Channel != Channel.y) %>%
  group_by(id, Channel, Time, Day, ZA, Channel.y) %>%
  arrange(abs(Time - Time.y)) %>%
  filter(row_number()==1) %>%
  group_by(id, Channel, Time, Day, ZA) %>%
  summarise(Sum=sum(ZA.y)) %>%                           
  ungroup() %>% 
  select(id:Sum) },

order = "elapsed")[1:4]

给予:

        test replications elapsed relative
1      sqldf          100    3.38    1.000
2 data.table          100    4.05    1.198
3      dplyr          100    9.23    2.731

我不确定它的速度(可能很慢),但它在内存方面会非常保守:

Channels = breaks[, unique(Channel)]
breaks[, Sum := breaks[breaks[row,
                              .(Day, Channel = setdiff(Channels, Channel), Time)],
                       on = .(Day, Channel, Time), roll = 'nearest',
                       sum(ZA)]
       , by = .(row = 1:nrow(breaks))]

它可能会帮助速度达到 setkey(breaks, Day, Channel, Time) 而不是使用 on

遇到这个并在 OP 编辑​​中看到了时间安排。因此,提出一种可能的 Rcpp 方法:

library(Rcpp)
#library(inline)
nearsum <- cppFunction('
NumericVector closestSum(NumericVector cid, NumericVector Time, NumericVector ZA) {
    int d, mintime, mintimeZA, prevChannel = 0, nextChannel = 0;
    int sz = cid.size();
    NumericVector sumvec(sz);

    for (int r = 0; r < sz; r++) {
        sumvec[r] = 0;
        mintime = 10000;
        //Rcpp::Rcout << "Beginning row = " << r << std::endl;

        for (int i = 0; i < sz; i++) {
            if (cid[r] != cid[i]) {
                //Rcpp::Rcout << "Current idx = " << i << std::endl;

                //handle boundary conditions
                if (i == 0) {
                    prevChannel = 0;    
                } else {
                    prevChannel = cid[i-1];
                }

                if (i == sz - 1) {
                    nextChannel = 0;    
                } else {
                    nextChannel = cid[i+1];
                }

                //calculate time difference
                d = abs(Time[i] - Time[r]);

                if (cid[i] != prevChannel) {
                    ///this is a new channel
                    mintime = d;
                    mintimeZA = ZA[i];
                } else {
                    if (d < mintime) {
                        //this is a new min in time diff
                        mintime = d;
                        mintimeZA = ZA[i];
                    }
                }

                //Rcpp::Rcout << "Time difference = " << d << std::endl;
                //Rcpp::Rcout << "ZA for current min time gap = " << mintimeZA << std::endl;

                if (cid[i] != nextChannel) {
                    //this is the last data point for this channel
                    mintime = 10000;
                    sumvec[r] += mintimeZA;
                    //Rcpp::Rcout << "Final sum for current row = " << sumvec[r] << std::endl;
                }
            }
        }
    }
    return sumvec;
}
')

调用cpp函数:

library(data.table)
setorder(breaks, id, Channel, Day, Time)
breaks[, ChannelID := .GRP, by=Channel]
breaks[, Sum := nearsum(ChannelID, Time, ZA), by=.(Day)]

输出:

   id Channel Time Day ZA ChannelID Sum
1:  1     NP1 1000   1 15         1  28
2:  2     NP1 1100   1 12         1  22
3:  3     NP2  975   1  4         2  39
4:  4     NP2 1075   1  2         2  32
5:  5     NP3 1010   1  1         3  42
6:  6     NP3 1080   1  2         3  32
7:  7     AT4 1000   1 23         4  20
8:  8     AT4 1050   1 18         4  19

时间码:

#create a larger dataset
largeBreaks <- rbindlist(lapply(1:1e5, function(n) copy(breaks)[, Day := n]))
setorder(largeBreaks, Day, Channel, Time)
largeBreaks[, id := .I]

library(sqldf)
mtd0 <- function() {
    sqldf("select id, Channel, Time, Day, ZA, sum(bZA) Sum
     from (
       select a.*, b.ZA bZA, min(abs(a.Time - b.Time))
       from largeBreaks a join largeBreaks b on a.Day = b.Day and a.Channel != b.Channel
       group by a.id, b.Channel)
     group by id")
}

mtd1 <- function() {
    setorder(largeBreaks, Day, Channel, Time)
    largeBreaks[, ChannelID := .GRP, by=Channel]
    largeBreaks[, Sum := nearsum(ChannelID, Time, ZA), by=.(Day)]
}

library(microbenchmark)
microbenchmark(mtd0(), mtd1(), times=3L)

时间[需要添加大约 5 秒(至少在我的机器上)来编译 cpp 函数]:

Unit: milliseconds
   expr        min         lq       mean    median         uq        max neval
 mtd0() 10449.6696 10505.7669 10661.7734 10561.864 10767.8252 10973.7863     3
 mtd1()   365.4157   371.2594   386.6866   377.103   397.3221   417.5412     3