运行 SQL 查询中的长度编码

Run Length Encoding within SQL Query

我有时间序列数据,我正在使用 运行-length 编码和一些额外的汇总统计信息进行汇总。问题是数据至少有 4000 万行,而我只有 16GB 的 RAM。目前,我不得不对成批数据执行相同的操作,然后将结果附加在一起。整个过程目前需要一天多的时间。我知道 for 循环很糟糕,但同时尝试我当前的所有查询会使我的 RStudio 崩溃...

我希望有人可以帮助将我的 for 循环写入一个函数,然后 运行 使用 R 中的并行包。或者只是优化我的原始查询???

for 循环一次查询一部分客户,因此我会尽力创建一个可重现的示例。

library(DBI)
library(dbplyr)
library(dplyr)
library(data.table)

customers <- data.frame(
  customer.number = c(12345, 23456, 34567, 45678, 56789)
)

n <- 2
nr <- nrow(customers)
X <- split(customers, rep(1:ceiling(nr/n), each=n, length.out=nr))

consumption <- data.frame(
  customer.number = c(12345, 12345, 12345,
                      23456, 23456, 23456, 
                      34567, 34567, 34567, 
                      45678, 45678, 45678, 
                      56789, 56789, 56789),
  consumption = c(1,2,3,
                  0,0,1,
                  1,0,1,
                  2,2,0,
                  0,0,0),
  datetime = c("2022-01-01 00:00:00", "2022-01-01 01:00:00", "2022-01-01 02:00:00",
               "2022-01-01 00:00:00", "2022-01-01 01:00:00", "2022-01-01 02:00:00",
               "2022-01-01 00:00:00", "2022-01-01 01:00:00", "2022-01-01 02:00:00",
               "2022-01-01 00:00:00", "2022-01-01 01:00:00", "2022-01-01 02:00:00",
               "2022-01-01 00:00:00", "2022-01-01 01:00:00", "2022-01-01 02:00:00")
)

beginning <- ymd_hms("2022-01-01 00:00:00")
ending <- ymd_hms("2022-02-01 00:00:00")

for(i in 1:length(X)){
  
  rle <- tbl(connection, "consumption") %>%
    select(customer.number, consumption, datetime) %>%
    mutate(flag = if_else(consumption >= 1, TRUE, FALSE)) %>%
    filter(customer.number %in% !!X[[i]]$customer.number,
           datetime >= !!beginning, 
           datetime < !!ending) %>%
    collect() %>%
    arrange(customer.number, datetime) %>%
    group_by(customer.number, Run = data.table::rleid(customer.number, flag), flag) %>%
    summarize(Start = min(datetime), 
              End = max(datetime), 
              Length = length(Run),
              Min.Consumption = min(consumption),
              Avg.Consumption = mean(consumption),
              Max.Consumption = max(consumption)) %>%
    filter(flag != FALSE)
  
  if(!exists("results")) {
    results <- rle
  } else if (exists("results")) {
    results <- rbind(results, rle)
  }
  
  if(names(X)[i] == "1"){
    results <- rle
  } else {
    results <- readRDS("results.rds") %>%
      rbind(rle)
  }
  
  saveRDS(results, file = "results.rds")
  
  remove(results, rle)
  
  print(names(X)[i])
  
}

这是一种在数据库上完成所有操作的方法。注意不需要循环,collect()语句在最后

tbl(connection, "consumption" ) %>%
  mutate(flag = if_else(consumption>0,1,0)) %>%
  filter(datetime >= !!beginning, datetime < !!ending) %>%
  group_by(flag) %>%
  window_order(customer_number, datetime) %>% 
  mutate(num2 = row_number()) %>% 
  ungroup() %>% 
  mutate(Run = row_number()-num2) %>% 
  select(-num2) %>% 
  group_by(customer_number, Run, flag) %>%
  summarize(Start = min(datetime), 
            End = max(datetime),
            Length = count(Run),
            Min.Consumption = min(consumption),
            Avg.Consumption = mean(consumption),
            Max.Consumption = max(consumption), .groups="drop") %>% 
  filter(flag==1) %>% 
  collect()

输出:

  customer_number     Run  flag Start               End                 Length Min.Consumption Avg.Consumption Max.Consumption
  <chr>           <int64> <dbl> <dttm>              <dttm>               <int>           <int>           <int>           <int>
1 12345                 0     1 2022-01-01 00:00:00 2022-01-01 02:00:00      3               1               2               3
2 23456                 2     1 2022-01-01 02:00:00 2022-01-01 02:00:00      1               1               1               1
3 34567                 2     1 2022-01-01 00:00:00 2022-01-01 00:00:00      1               1               1               1
4 34567                 3     1 2022-01-01 02:00:00 2022-01-01 02:00:00      1               1               1               1
5 45678                 3     1 2022-01-01 00:00:00 2022-01-01 01:00:00      2               2               2               2