使用 sparklyr 按时间戳 + ID 查找缺失的行

Find missing rows by timestamp + ID with sparklyr

我试图找到丢失的时间戳。这里有很多解决这个单一问题的解决方案。尽管如此,我还想查找 "where" 时间戳 ID 丢失。

因此,例如测试数据集将如下所示:

elemuid timestamp
1232    2018-02-10 23:00:00
1232    2018-02-10 23:01:00
1232    2018-02-10 22:58:00
1674    2018-02-10 22:40:00
1674    2018-02-10 22:39:00
1674    2018-02-10 22:37:00
1674    2018-02-10 22:35:00

解决方案应该是这样的:

elemuid timestamp
1232    2018-02-10 22:59:00
1674    2018-02-10 22:38:00
1674    2018-02-10 22:36:00

我的问题是我只能使用 dplyr,因为我想在 sparklyr 中也使用此代码。 如果能得到您的帮助,我将非常高兴!

这是 anti_join 的一个选项。假设 'timestamp' 列不是 Datetime 对象,我们将其转换为 POSIXct

library(tidyverse)
df1 <- df1 %>%
          mutate(timestamp = ymd_hms(timestamp)) 

按 'elemuid' 分组,使用 complete 将 'timestamp' 扩展 1 分钟,并对原始数据集

执行 anti_join
df1 %>%
    group_by(elemuid) %>% 
    complete(timestamp = seq(min(timestamp), max(timestamp), by = "1 min")) %>% 
    anti_join(df1)
# A tibble: 3 x 2
# Groups: elemuid [?]
#   elemuid timestamp          
#     <int> <dttm>             
#1    1232 2018-02-10 22:59:00
#2    1674 2018-02-10 22:36:00
#3    1674 2018-02-10 22:38:00

为简单起见,我们假设您已经在几秒钟内完成了 , and computed (min_max) Epoch time

剩下的步骤与我们之前遵循的步骤非常相似:

  • 生成值范围:

    epoch_range <- spark_session(sc) %>% 
      invoke("range", as.integer(min_max[1]), as.integer(min_max[2]), 60L) %>%
      invoke("withColumnRenamed", "id", "timestamp")
    
  • 计算不同 elemuid

    elemuids <- df %>% select(elemuid) %>% distinct() %>% spark_dataframe()
    

现在,我们要生成引用 table 作为范围和唯一 ID 的笛卡尔积:

ref <- epoch_range %>% 
  invoke("crossJoin", elemuids) %>% 
  sdf_register() %>%
  mutate(timestamp = from_unixtime(timestamp, "yyyy-MM-dd HH:mm:ss.SSS"))

您可能想使用更多 dplyr 风格的方法:

sdf_register(epoch_range) %>% mutate(dummy = 1) %>% 
  left_join(sdf_register(elemuids) %>% mutate(dummy = 1), by = "dummy") %>%
  select(-dummy)

如果产品的大小很小(在这种情况下 Spark 应该使用广播连接),这会很好地工作,但否则会导致完全数据倾斜,因此通常使用不安全。

最后我们将像以前一样外连接数据:

ref %>% left_join(df, by = c("timestamp", "elemuid"))

补东西,或者(如 provided by akrun)反加入去掉漏点:

ref %>% anti_join(df, by = c("timestamp", "elemuid"))