如何实现滚动连接 "without replacement"(源 table 中的一行应映射到结果中的 0 或 1 行)

How to implement a rolling join "without replacement" (a row in either source table should map to 0 or 1 rows in the result)

考虑 purchasessales

的这些表格
purchases <- data.table(
  purchase_id = c(10,20,30,40,50,60),
  store = c("a", "a", "a", "b", "b", "b"),
  date = c(1,1,2,3,3,3)
)

sales <- data.table(
  sale_id = c(1,2,3,4,5,6),
  store = c("a", "a", "a", "b", "b", "b"),
  date = c(1,1,1,3,3,4)
)

> purchases
    purchase_id store date
1:           10     a    1
2:           20     a    1
3:           30     a    2
4:           40     b    3
5:           50     b    3
6:           60     b    3
> sales
   sale_id store date
1:       1     a    1
2:       2     a    1
3:       3     a    1
4:       4     b    3
5:       5     b    3
6:       6     b    4

我想将每次购买映射到同时或稍后(在同一家商店)发生的销售。问题是 一次购买应恰好映射到一次或 none 销售,反之亦然

有多种解决方案可以满足我的要求,但一个简单的解决方案遵循以下算法:

For each purchase:
  Subset sales where sale store matches purchase store and sale date >= purchase date
  Select the first sale in the subset and map it to this purchase
  REMOVE THIS SALE FROM THE sales TABLE!

这会产生像

这样的映射
    purchase_id sale_id
1:           10       1
2:           20       2
3:           30      NA
4:           40       4
5:           50       5
6:           60       6

data.table 有没有优雅的方法来做到这一点?


肮脏的解决方案

这是我开发的一个肮脏但可行的解决方案。

rolling_join_without_replacement <- function(x, i, on, roll, allow.cartesian = FALSE){
  # Dirty implementation of a rolling join matching algo without replacement
  # Each row in i maps to exactly one row in the result
  # Each row in x maps to exactly zero or one rows in the result
  
  # Copy x and i
  x2 <- copy(x)
  i2 <- copy(i)
  
  # Create row id fields for each table
  x2[, x_row := .I]
  i2[, i_row := .I]
  
  allmatches <- list()
  while(TRUE){
    
    # Execute the rolling join
    matches <- x2[i2, on = on, roll = roll, allow.cartesian = allow.cartesian, nomatch = 0L]
    
    # If no matches, break
    if(nrow(matches) == 0) break
    
    # Get the first match per i, then get the first match per x
    matches <- matches[matches[, .I[1L], by = i_row]$V1]
    matches <- matches[matches[, .I[1L], by = x_row]$V1]
    
    # Save these matches
    allmatches <- c(allmatches, list(matches))
    
    # Exclude these x and i from future matches
    x2 <- x2[!matches, on = "x_row"]
    i2 <- i2[!matches, on = "i_row"]
  }
  
  # Combine matches
  allmatches <- rbindlist(allmatches, use.names = TRUE)
  
  # Include unmatched i rows
  unmatched <- i2[!allmatches, on = "i_row"]
  allmatches <- rbind(allmatches, unmatched, use.names = TRUE, fill = TRUE)
  
  return(allmatches[])
}

用法

rolling_join_without_replacement(
  x = sales, 
  i = purchases, 
  on = c("store", "date"), 
  roll = -Inf, 
  allow.cartesian = TRUE
)

    purchase_id sale_id
1:           10       1
2:           20       2
3:           30      NA
4:           40       4
5:           50       5
6:           60       6

你可以试试这个:

x = sales[purchases, on=.(store,date>=mindate)][order(store,date, sale_id, purchase_id)]

res=x[0]

f <- function(df,res) df[sale_id %in% res$sale_id==F][sale_id == min(sale_id)]

for(p in unique(x$purchase_id)) res = rbind(res,f(x[purchase_id==p],res))

res = rbind(res,x[is.na(sale_id)])

输出:

   sale_id  store  date purchase_id
     <num> <char> <num>       <num>
1:       1      a     1          10
2:       2      a     1          20
3:       4      b     3          40
4:       5      b     3          50
5:       6      b     3          60
6:      NA      a     2          30

一个选择是左 non-equi 加入并让 data.table 找到每次购买的所有匹配项, 使用更多内存, 然后通过按 sale_id 对结果进行排序并利用它进行过滤:

sales[purchases, on = .(store, date >= date), allow.cartesian = TRUE][
    , by = store, .SDcols = c("purchase_id", "sale_id"), {
        ordered_sd <- .SD[order(sale_id, purchase_id)]
        purchase_id <- ordered_sd$purchase_id
        sale_id <- ordered_sd$sale_id

        purchase_buffer <- sale_buffer <- purchases[.BY$store, purchase_id, on = "store"]
        buffer_index <- 1L
        #cat(address(purchase_buffer), address(sale_buffer), "first", sep = "\n")

        current_pid <- NULL
        max_sid <- 0L

        do_updates <- function(i, sid) {
            current_pid <<- purchase_id[i]

            if (!is.na(sid)) {
                max_sid <<- sid
            }

            purchase_buffer[buffer_index] <<- current_pid
            sale_buffer[buffer_index] <<- sid
            buffer_index <<- buffer_index + 1L
            #cat(address(purchase_buffer), address(sale_buffer), "update", sep = "\n")
        }

        for (i in seq_along(purchase_id)) {
            if (is.null(current_pid)) {
                do_updates(i, sale_id[i])
            }

            pid <- purchase_id[i]

            if (pid > current_pid) {
                if (purchase_buffer[buffer_index - 1L] != current_pid) {
                    do_updates(i - 1L, NA_integer_)
                }

                sid <- sale_id[i]

                if (!is.na(sid) && sid > max_sid) {
                    do_updates(i, sid)
                }
            }
        }

        if (purchase_buffer[buffer_index - 1L] != pid) {
            do_updates(i, NA_integer_)
        }

        list(
            purchase_id = purchase_buffer,
            sale_id = sale_buffer
        )
    }]

其作用的细分:

  1. 左边non-equi加入
  2. 在每个 store 组中主要按 sale_id 排序,以防 ID 可以在所有商店中重复。
  3. 创建缓冲区。由于您希望每个购买 ID 一行, 我们可以从原始 table 的 purchase_id 列开始: purchases[.BY$store, purchase_id, on = "store"]。 参见 secondary indices
  4. 因为我们现在在 sale_id 有一个订单, 我们可以遍历这些值并通过跟踪循环中看到的最大 ID (max_sid) 为每次购买提取一个 ID。 如果我们没有找到任何购买的销售 ID,我们输入 NA.

一些注意事项:

  • 您的代码不会生成整数 ID, 但我假设它们是整数, 这就是我使用 NA_integer_ 的原因。 如果您的 ID 不是整数(参见 storage.mode), 使用 NA_real_ 避免合并。
  • 您可以看到一些对 cat 的注释调用,我用来检查是否正在为缓冲区制作副本。 好像只正确复制了一次,然后原地修改。
  • 循环后的附加 if 分支用于最后一次购买,如果没有任何匹配的销售, 因为在那种情况下 pid > current_pid && purchase_buffer[buffer_index - 1L] != current_pid 永远不会是真的。

现在,如果您的实际数据集不接受额外的内存利用率table, 您可以通过在还指定了 by:

的框架内加入 .SD 来实现更小的联接
sales[, by = store, .SD[purchases[.BY$store, on = "store"], .(purchase_id, sale_id), on = .(date >= date), allow.cartesian = TRUE][
    order(sale_id, purchase_id)][
        , {
            ...
        }] # the whole 'j' expression for 'sales[...]' ends here
]

{} 之间的代码与之前几乎相同,只是没有创建 ordered_sd 和它下面的两行。

在这种情况下,在每次调用中,.SD 将包含给定商店的 sales 的子集, 这也是为什么我们用 purchases[.BY$store, on = "store"] 手动创建 purchases 的一个子集,这样我们只加入同一商店的行。

然而,即使你使用第二种方法, purchases 中的每一行都可能匹配 sales 中的很多行, 所以你可以考虑应用一种 window 这样匹配就不会考虑时间很远的事情。 为此,您可以将类似这样的内容添加到 sales:

sales[, match_limit_inclusive := date - 2]

并将连接条件更改为:

on = .(date >= date, match_limit_inclusive <= date)

根据 OP,目标是

to map each purchase to the sale that occurred at the same time or later (and at the same store). The catch is one purchase should be mapped to exactly one or none sales, and vice-versa.

如果我理解正确,OP 正在寻求在删除购买前发生的销售事件后(针对每个商店)将购买 ID 向量与销售 ID 向量对齐。

这是一种使用 non-equi joinrowid() 来选择对齐行的方法:

library(data.table)
sales[purchases, on = c("store", "date>=date"), 
  .(store, purchase_id, sale_id = sale_id[x.date >= i.date])][
    rowid(store, purchase_id) == rowid(store, sale_id)]

修改后的用例结果(以涵盖更多边缘情况,例如,更多商店):

   store purchase_id sale_id
1:     a          10       1
2:     a          20       2
3:     a          30      NA
4:     b          40       5
5:     b          50       6
6:     b          60       7
7:     d          70      NA

请注意,包含 store 是为了安全和完整,因为 purchase_idsale_id 可能并非在所有商店中都是唯一的。

另请注意,结果在很大程度上取决于 purchasessales 中的行顺序。

数据

修改示例数据以涵盖更多边缘情况:

purchases <- data.table(
  purchase_id = c(10,20,30,40,50,60,70),
  store = c("a", "a", "a", "b", "b", "b", "d"),
  date = c(1,1,2,3,3,3,3)
)

sales <- data.table(
  sale_id = c(1,2,3,4,5,6,7,8),
  store = c("a", "a", "a", "b", "b", "b", "b", "c"),
  date = c(1,1,1,2,3,3,4,5)
)

purchases
   purchase_id store date
1:          10     a    1
2:          20     a    1
3:          30     a    2
4:          40     b    3
5:          50     b    3
6:          60     b    3
7:          70     d    3

包括商店中的额外购买 d

sales
   sale_id store date
1:       1     a    1
2:       2     a    1
3:       3     a    1
4:       4     b    2
5:       5     b    3
6:       6     b    3
7:       7     b    4
8:       8     c    5

包括 2 个额外的销售(第 4 和 8 行)和一个额外的商店 c

说明

第一个表达式

sales[purchases, on = c("store", "date>=date"), 
  .(store, purchase_id, sale_id = sale_id[x.date >= i.date])]

returns purchase_id 与有效 sale_id 的所有可能组合,即仅包含销售日期 x.date 的那些 sale_id或购买日期 i.date 之后(每家商店):

    store purchase_id sale_id
 1:     a          10       1
 2:     a          10       2
 3:     a          10       3
 4:     a          20       1
 5:     a          20       2
 6:     a          20       3
 7:     a          30      NA
 8:     b          40       5
 9:     b          40       6
10:     b          40       7
11:     b          50       5
12:     b          50       6
13:     b          50       7
14:     b          60       5
15:     b          60       6
16:     b          60       7
17:     d          70      NA

第二个表达式

[rowid(store, purchase_id) == rowid(store, sale_id)]

purchase_id 的每个唯一值和 sale_id 的每个唯一值创建 ID 号,并通过匹配 ID 号创建子集。

我将保留我的其他答案, 但我意识到您可以使用类似的逻辑来完全避免连接。 但是,要使其正常工作,您需要事先对 table 进行排序。 如果那是acceptable, 如果您的 table 很大,它可能会节省大量内存。

setkey(purchases, store, date, purchase_id)
setkey(sales, store, date, sale_id)

purchases[, sale_id := {
  # j is the index for sales, make sure we start at the same store as purchases
  j <- 1L
  while (j <= nrow(sales) && sales$store[j] != store[1L]) {
    j <- j + 1L
  }
  if (j > nrow(sales)) {
    return(NA_real_)
  }
  
  sale_id <- purchase_id
  i <- 1L
  
  while (i <= length(purchase_id) && j <= nrow(sales)) {
    # if stores no longer match, add NA until they do
    while (i <= length(purchase_id) && sales$store[j] != store[i]) {
      sale_id[i] <- NA_real_
      i <- i + 1L
    }
    
    if (i > length(purchase_id)) {
      break
    }
    
    # move sales' cursor until we find a date that's at or after current purchase
    while (j <= nrow(sales) && sales$store[j] == store[i] && sales$date[j] < date[i]) {
      j <- j + 1L
    }
    
    # if j is still valid and stores still match, we found a valid sale date
    # otherwise no valid sale was found, so enter NA
    if (j <= nrow(sales) && sales$store[j] == store[i]) {
      sale_id[i] <- sales$sale_id[j]
      i <- i + 1L
      j <- j + 1L
    }
    else {
      sale_id[i] <- NA_real_
      i <- i + 1L
    }
  }
  
  sale_id
}]

本质上,您只需为每个 table 保留游标并利用列已排序的事实。 当找到匹配的销售时,你增加两个光标, 但只有在不是这种情况时才移动其中一个光标, 取决于 storedate 是否不再对匹配有效。