使用聚合和过滤器进行高效的交叉连接
Efficient cross join with aggregation and filter
根据标题,我希望使用 table 执行交叉连接,它执行聚合函数并过滤 table 中的几个变量。
我有与以下类似的数据:
library(dplyr)
library(data.table)
library(sqldf)
sales <- data.frame(salesx = c(3000, 2250,850,1800,1700,560,58,200,965,1525)
,week = seq(from = 1, to = 10, by = 1)
,uplift = c(0.04)
,slope = c(100)
,carryover = c(.35))
spend <- data.frame(spend = seq(from = 1, to = 50000, by = 1))
tempdata <- merge(spend,sales,all=TRUE)
tempdata$singledata <- as.numeric(1)
下面是我试图通过基于 sql 的解决方案完成的示例:
newdata <- sqldf("select a.spend, a.week,
sum(case when b.week > a.week
then b.salesx*(b.uplift*(1-exp(-(power(b.singledata,b.week-a.week)/b.slope))))/b.spend
else 0.0 end) as calc3
from tempdata a, tempdata b
where a.spend = b.spend
group by a.spend,a.week")
这提供了我想要的结果,但速度有点慢,尤其是我的真实数据集大约有 100 万条记录。最好能就 a) 如何加速 sqldf 函数提出一些建议;或 b) 使用更有效的数据。table/dplyr 方法(我无法理解交叉 join/aggregation/filter 三重问题)。
关于 non-equi 的澄清加入以下解决方案:
我有几个关于 non-equi 加入解决方案的问题 – 输出很好而且很快。为了了解代码的工作原理,我将其分解如下:
breakdown <- setDT(tempdata)[tempdata, .(spend, uplift, slope,carryover,salesx, singledata, week, i.week,x.week, i.salesx,x.salesx, x.spend, i.spend), on=.(spend, week > week)]
根据细分,为了和原来的计算一致,应该是:
x.salesx*(uplift*(1.0-exp(-(`^`(singledata,x.week-week)/slope))))/i.spend
之所以不明显,是因为在示例中我使用了等式的“幂”部分并没有真正做任何事情(总是 1)。实际使用的计算是(向数据添加结转变量):
SQL
b.salesx*(b.uplift*(1-exp(-(power((b.singledata*b.carryover),b.week-a.week)/b.slope))))/b.spend (sql)
我的data.table解决方案
sum(salesx.y*(uplift.y*(1-exp(-((singledata.y*adstock.y)^(week.y-week.x)/slope.y))))/spend), by=list(spend, week.x)
但是,在添加“结转”变量时,我无法使用非等值连接解决方案,即。
x.salesx*(uplift*(1.0-exp(-(`^`((singledata*carryover),x.week-week)/slope))))/i.spend
终于有时间再次调查了:
我原来的解决方案:
system.time(newdata <- sqldf("select a.spend, a.week,
sum(case when b.week > a.week
then b.salesx*(b.uplift*(1-exp(-(power(b.singledata,b.week-a.week)/b.slope))))/b.spend
else 0.0 end) as calc3
from tempdata a, tempdata b
where a.spend = b.spend
group by a.spend,a.week"))
user system elapsed
11.99 3.77 16.11
有索引(虽然有些东西告诉我这不能正常工作):
system.time(newdata2 <- sqldf(c('create index newindex on tempdata(spend)',
'select a.spend, a.week,
sum(case when b.week > a.week
then b.salesx*(b.uplift*(1-exp(-(power(b.singledata,b.week-a.week)/b.slope))))/b.spend
else 0.0 end) as calc3
from main.tempdata a left join main.tempdata b
on a.spend = b.spend
group by a.spend,a.week'), dbname = tempfile()))
user system elapsed
12.73 2.93 15.76
Data.table解决方案(sql中ifelse语句中的return不是0):
datatablefunc <- function(g){
tempdata2 <- as.data.table(g)
setkey(tempdata2, spend)
tempdata3 <- merge(tempdata2, tempdata2, by="spend", all=TRUE, allow.cartesian=TRUE)
tempdata4 <- tempdata3[week.y > week.x, sum(salesx.y*(uplift.y*(1-exp(-(singledata.y^(week.y-week.x)/slope.y))))/spend), by=list(spend, week.x)]
return(tempdata4)
}
system.time(newdata3 <- datatablefunc(tempdata))
user system elapsed
2.36 0.25 2.62
基于 sql 的解决方案的美妙之处在于,因为临时输出存储在 sql 服务器中而不是内存中,所以我不会 运行 讨厌 'cannot allocate vector' 问题,这与数据有关。table/dplyr 解决方案(当我添加更多数据时)...缺点是 运行 需要更长的时间。
data.table
1.9.8 版(2016 年 11 月 25 日在 CRAN 上)引入了非等值连接,这有助于避免消耗内存的交叉连接:
library(data.table)
newdata4 <-
# coerce to data.table
setDT(tempdata)[
# non-equi self-join
tempdata, on = .(spend, week > week),
# compute result
.(calc3 = sum(salesx*(uplift*(1.0-exp(-(`^`(singledata,week-i.week)/slope))))/i.spend)),
# grouped by join parameters
by = .EACHI][
# replace NA
is.na(calc3), calc3 := 0.0][]
# check that results are equal
all.equal(newdata, as.data.frame(newdata4[order(spend, week)]))
[1] TRUE
基准
OP 提供了 解决方案、两个 sqldf
变体和一个 data.table
使用交叉连接的方法。将这些与非相等连接进行比较。
下面的代码
dt_tempdata <- data.table(tempdata)
microbenchmark::microbenchmark(
sqldf = {
newdata <- sqldf("select a.spend, a.week,
sum(case when b.week > a.week
then b.salesx*(b.uplift*(1-exp(-(power(b.singledata,b.week-a.week)/b.slope))))/b.spend
else 0.0 end) as calc3
from tempdata a, tempdata b
where a.spend = b.spend
group by a.spend,a.week")
},
sqldf_idx = {
newdata2 <- sqldf(c('create index newindex on tempdata(spend)',
'select a.spend, a.week,
sum(case when b.week > a.week
then b.salesx*(b.uplift*(1-exp(-(power(b.singledata,b.week-a.week)/b.slope))))/b.spend
else 0.0 end) as calc3
from main.tempdata a left join main.tempdata b
on a.spend = b.spend
group by a.spend,a.week'), dbname = tempfile())
},
dt_merge = {
newdata3 <- merge(dt_tempdata, dt_tempdata, by="spend", all=TRUE, allow.cartesian=TRUE)[
week.y > week.x,
.(calc3 = sum(salesx.y*(uplift.y*(1-exp(-(singledata.y^(week.y-week.x)/slope.y)))))),
by=.(spend, week.x)]
},
dt_nonequi = {
newdata4 <- dt_tempdata[
dt_tempdata, on = .(spend, week > week),
.(calc3 = sum(salesx*(uplift*(1.0-exp(-(`^`(singledata,week-i.week)/slope))))/i.spend)),
by = .EACHI][is.na(calc3), calc3 := 0.0]
},
times = 3L
)
returns 这些时间:
Unit: seconds
expr min lq mean median uq max neval cld
sqldf 9.456110 10.081704 10.647193 10.707299 11.242735 11.778171 3 b
sqldf_idx 10.980590 11.477774 11.734239 11.974958 12.111064 12.247170 3 b
dt_merge 3.037857 3.147274 3.192227 3.256692 3.269412 3.282131 3 a
dt_nonequi 1.768764 1.776581 1.792359 1.784397 1.804156 1.823916 3 a
对于给定的问题大小,非等连接是最快的,几乎是 merge/cross-join data.table
方法的两倍,比 sqldf
代码快 6 倍。有趣的是,索引创建 and/or 临时文件的使用在我的系统上似乎相当昂贵。
请注意,我已经简化了 OP 的 data.table
解决方案。
最后,除merge/cross-join(我已避免修复此版本)外的所有版本return结果相同。
all.equal(newdata, newdata2) # TRUE
all.equal(newdata, as.data.frame(newdata3[order(spend, week.x)])) # FALSE (last week missing)
all.equal(newdata, as.data.frame(newdata4[order(spend, week)])) # TRUE
更大的问题
OP 报告说 merge/cross-join data.table
解决方案 运行 的 1 M 行生产数据集内存不足。为了验证非 equi 连接方法消耗的内存更少,我用 5 M 行 (nrow(tempdata)
) 的问题大小对其进行了测试,这比之前的基准 运行s 大十倍。在我的 8 GB 内存的 PC 上,运行 在大约 18 秒内顺利完成。
Unit: seconds
expr min lq mean median uq max neval
dt_nonequi 18.12387 18.12657 18.23454 18.12927 18.28987 18.45047 3
根据标题,我希望使用 table 执行交叉连接,它执行聚合函数并过滤 table 中的几个变量。
我有与以下类似的数据:
library(dplyr)
library(data.table)
library(sqldf)
sales <- data.frame(salesx = c(3000, 2250,850,1800,1700,560,58,200,965,1525)
,week = seq(from = 1, to = 10, by = 1)
,uplift = c(0.04)
,slope = c(100)
,carryover = c(.35))
spend <- data.frame(spend = seq(from = 1, to = 50000, by = 1))
tempdata <- merge(spend,sales,all=TRUE)
tempdata$singledata <- as.numeric(1)
下面是我试图通过基于 sql 的解决方案完成的示例:
newdata <- sqldf("select a.spend, a.week,
sum(case when b.week > a.week
then b.salesx*(b.uplift*(1-exp(-(power(b.singledata,b.week-a.week)/b.slope))))/b.spend
else 0.0 end) as calc3
from tempdata a, tempdata b
where a.spend = b.spend
group by a.spend,a.week")
这提供了我想要的结果,但速度有点慢,尤其是我的真实数据集大约有 100 万条记录。最好能就 a) 如何加速 sqldf 函数提出一些建议;或 b) 使用更有效的数据。table/dplyr 方法(我无法理解交叉 join/aggregation/filter 三重问题)。
关于 non-equi 的澄清加入以下解决方案:
我有几个关于 non-equi 加入解决方案的问题 – 输出很好而且很快。为了了解代码的工作原理,我将其分解如下:
breakdown <- setDT(tempdata)[tempdata, .(spend, uplift, slope,carryover,salesx, singledata, week, i.week,x.week, i.salesx,x.salesx, x.spend, i.spend), on=.(spend, week > week)]
根据细分,为了和原来的计算一致,应该是:
x.salesx*(uplift*(1.0-exp(-(`^`(singledata,x.week-week)/slope))))/i.spend
之所以不明显,是因为在示例中我使用了等式的“幂”部分并没有真正做任何事情(总是 1)。实际使用的计算是(向数据添加结转变量):
SQL
b.salesx*(b.uplift*(1-exp(-(power((b.singledata*b.carryover),b.week-a.week)/b.slope))))/b.spend (sql)
我的data.table解决方案
sum(salesx.y*(uplift.y*(1-exp(-((singledata.y*adstock.y)^(week.y-week.x)/slope.y))))/spend), by=list(spend, week.x)
但是,在添加“结转”变量时,我无法使用非等值连接解决方案,即。
x.salesx*(uplift*(1.0-exp(-(`^`((singledata*carryover),x.week-week)/slope))))/i.spend
终于有时间再次调查了:
我原来的解决方案:
system.time(newdata <- sqldf("select a.spend, a.week,
sum(case when b.week > a.week
then b.salesx*(b.uplift*(1-exp(-(power(b.singledata,b.week-a.week)/b.slope))))/b.spend
else 0.0 end) as calc3
from tempdata a, tempdata b
where a.spend = b.spend
group by a.spend,a.week"))
user system elapsed
11.99 3.77 16.11
有索引(虽然有些东西告诉我这不能正常工作):
system.time(newdata2 <- sqldf(c('create index newindex on tempdata(spend)',
'select a.spend, a.week,
sum(case when b.week > a.week
then b.salesx*(b.uplift*(1-exp(-(power(b.singledata,b.week-a.week)/b.slope))))/b.spend
else 0.0 end) as calc3
from main.tempdata a left join main.tempdata b
on a.spend = b.spend
group by a.spend,a.week'), dbname = tempfile()))
user system elapsed
12.73 2.93 15.76
Data.table解决方案(sql中ifelse语句中的return不是0):
datatablefunc <- function(g){
tempdata2 <- as.data.table(g)
setkey(tempdata2, spend)
tempdata3 <- merge(tempdata2, tempdata2, by="spend", all=TRUE, allow.cartesian=TRUE)
tempdata4 <- tempdata3[week.y > week.x, sum(salesx.y*(uplift.y*(1-exp(-(singledata.y^(week.y-week.x)/slope.y))))/spend), by=list(spend, week.x)]
return(tempdata4)
}
system.time(newdata3 <- datatablefunc(tempdata))
user system elapsed
2.36 0.25 2.62
基于 sql 的解决方案的美妙之处在于,因为临时输出存储在 sql 服务器中而不是内存中,所以我不会 运行 讨厌 'cannot allocate vector' 问题,这与数据有关。table/dplyr 解决方案(当我添加更多数据时)...缺点是 运行 需要更长的时间。
data.table
1.9.8 版(2016 年 11 月 25 日在 CRAN 上)引入了非等值连接,这有助于避免消耗内存的交叉连接:
library(data.table)
newdata4 <-
# coerce to data.table
setDT(tempdata)[
# non-equi self-join
tempdata, on = .(spend, week > week),
# compute result
.(calc3 = sum(salesx*(uplift*(1.0-exp(-(`^`(singledata,week-i.week)/slope))))/i.spend)),
# grouped by join parameters
by = .EACHI][
# replace NA
is.na(calc3), calc3 := 0.0][]
# check that results are equal
all.equal(newdata, as.data.frame(newdata4[order(spend, week)]))
[1] TRUE
基准
OP 提供了 sqldf
变体和一个 data.table
使用交叉连接的方法。将这些与非相等连接进行比较。
下面的代码
dt_tempdata <- data.table(tempdata)
microbenchmark::microbenchmark(
sqldf = {
newdata <- sqldf("select a.spend, a.week,
sum(case when b.week > a.week
then b.salesx*(b.uplift*(1-exp(-(power(b.singledata,b.week-a.week)/b.slope))))/b.spend
else 0.0 end) as calc3
from tempdata a, tempdata b
where a.spend = b.spend
group by a.spend,a.week")
},
sqldf_idx = {
newdata2 <- sqldf(c('create index newindex on tempdata(spend)',
'select a.spend, a.week,
sum(case when b.week > a.week
then b.salesx*(b.uplift*(1-exp(-(power(b.singledata,b.week-a.week)/b.slope))))/b.spend
else 0.0 end) as calc3
from main.tempdata a left join main.tempdata b
on a.spend = b.spend
group by a.spend,a.week'), dbname = tempfile())
},
dt_merge = {
newdata3 <- merge(dt_tempdata, dt_tempdata, by="spend", all=TRUE, allow.cartesian=TRUE)[
week.y > week.x,
.(calc3 = sum(salesx.y*(uplift.y*(1-exp(-(singledata.y^(week.y-week.x)/slope.y)))))),
by=.(spend, week.x)]
},
dt_nonequi = {
newdata4 <- dt_tempdata[
dt_tempdata, on = .(spend, week > week),
.(calc3 = sum(salesx*(uplift*(1.0-exp(-(`^`(singledata,week-i.week)/slope))))/i.spend)),
by = .EACHI][is.na(calc3), calc3 := 0.0]
},
times = 3L
)
returns 这些时间:
Unit: seconds expr min lq mean median uq max neval cld sqldf 9.456110 10.081704 10.647193 10.707299 11.242735 11.778171 3 b sqldf_idx 10.980590 11.477774 11.734239 11.974958 12.111064 12.247170 3 b dt_merge 3.037857 3.147274 3.192227 3.256692 3.269412 3.282131 3 a dt_nonequi 1.768764 1.776581 1.792359 1.784397 1.804156 1.823916 3 a
对于给定的问题大小,非等连接是最快的,几乎是 merge/cross-join data.table
方法的两倍,比 sqldf
代码快 6 倍。有趣的是,索引创建 and/or 临时文件的使用在我的系统上似乎相当昂贵。
请注意,我已经简化了 OP 的 data.table
解决方案。
最后,除merge/cross-join(我已避免修复此版本)外的所有版本return结果相同。
all.equal(newdata, newdata2) # TRUE
all.equal(newdata, as.data.frame(newdata3[order(spend, week.x)])) # FALSE (last week missing)
all.equal(newdata, as.data.frame(newdata4[order(spend, week)])) # TRUE
更大的问题
OP 报告说 merge/cross-join data.table
解决方案 运行 的 1 M 行生产数据集内存不足。为了验证非 equi 连接方法消耗的内存更少,我用 5 M 行 (nrow(tempdata)
) 的问题大小对其进行了测试,这比之前的基准 运行s 大十倍。在我的 8 GB 内存的 PC 上,运行 在大约 18 秒内顺利完成。
Unit: seconds expr min lq mean median uq max neval dt_nonequi 18.12387 18.12657 18.23454 18.12927 18.28987 18.45047 3