使用 sparklyr 完成时间序列
Complete time-series with sparklyr
我正试图在我的时间序列数据集中查找缺失的分钟数。我在一个小样本上写了一个本地性能的 R 代码:
test <- dfv %>% mutate(timestamp = as.POSIXct(DaySecFrom.UTC.)) %>%
complete(timestamp = seq.POSIXt(min(timestamp), max(timestamp), by = 'min'), ElemUID)
但是您不能在 spark_tbl 上使用 tidyr 中的 complete()
。
Error in UseMethod("complete_") :
no applicable method for 'complete_' applied to an object of class "c('tbl_spark', 'tbl_sql', 'tbl_lazy', 'tbl')"
这里是一些测试数据:
ElemUID ElemName Kind Number DaySecFrom(UTC) DaySecTo(UTC)
399126817 A648/13FKO-66 DEZ 2017-07-01 23:58:00.000 2017-07-01 23:59:00.000
483492732 A661/18FRS-97 DEZ 120.00 2017-07-01 23:58:00.000 2017-07-01 23:59:00.000
399126819 A648/12FKO-2 DEZ 60.00 2017-07-01 23:58:00.000 2017-07-01 23:59:00.000
399126818 A648/12FKO-1 DEZ 180.00 2017-07-01 23:58:00.000 2017-07-01 23:59:00.000
399126816 A648/13FKO-65 DEZ 2017-07-01 23:58:00.000 2017-07-01 23:59:00.000
398331142 A661/31OFN-1 DEZ 120.00 2017-07-01 23:58:00.000 2017-07-01 23:59:00.000
398331143 A661/31OFN-2 DEZ 2017-07-01 23:58:00.000 2017-07-01 23:59:00.000
483492739 A5/28FKN-65 DEZ 2017-07-01 23:58:00.000 2017-07-01 23:59:00.000
483492735 A661/23FRS-97 DEZ 60.00 2017-07-01 23:58:00.000 2017-07-01 23:59:00.000
是否有任何其他方法或变通方法可以在 R 中的 spark-cluster 上解决此任务?
如果能得到您的帮助,我将非常高兴!
找到作为纪元时间的最小值和最大值:
df <- copy_to(sc, tibble(id=1:4, timestamp=c(
"2017-07-01 23:49:00.000", "2017-07-01 23:50:00.000",
# 6 minutes gap
"2017-07-01 23:56:00.000",
# 1 minute gap
"2017-07-01 23:58:00.000")
), "df", overwrite=TRUE)
min_max <- df %>%
summarise(min(unix_timestamp(timestamp)), max(unix_timestamp(timestamp))) %>%
collect() %>%
unlist()
生成从 min(epoch_time)
到 max(epoch_time) + interval
的参考范围:
library(glue)
query <- glue("SELECT id AS timestamp FROM RANGE({min_max[1]}, {min_max[2] + 60}, 60)") %>%
as.character()
ref <- spark_session(sc) %>% invoke("sql", query) %>%
sdf_register() %>%
mutate(timestamp = from_unixtime(timestamp, "yyyy-MM-dd HH:mm:ss.SSS"))
外连接:
ref %>% left_join(df, by="timestamp")
# Source: lazy query [?? x 2]
# Database: spark_connection
timesptamp id
<chr> <int>
1 2017-07-01 23:49:00.000 1
2 2017-07-01 23:50:00.000 2
3 2017-07-01 23:51:00.000 NA
4 2017-07-01 23:52:00.000 NA
5 2017-07-01 23:53:00.000 NA
6 2017-07-01 23:54:00.000 NA
7 2017-07-01 23:55:00.000 NA
8 2017-07-01 23:56:00.000 3
9 2017-07-01 23:57:00.000 NA
10 2017-07-01 23:58:00.000 4
# ... with more rows
注:
如果您遇到与 SPARK-20145 相关的问题,您可以将 SQL 查询替换为:
spark_session(sc) %>%
invoke("range", as.integer(min_max[1]), as.integer(min_max[2]), 60L) %>%
sdf_register()
我正试图在我的时间序列数据集中查找缺失的分钟数。我在一个小样本上写了一个本地性能的 R 代码:
test <- dfv %>% mutate(timestamp = as.POSIXct(DaySecFrom.UTC.)) %>%
complete(timestamp = seq.POSIXt(min(timestamp), max(timestamp), by = 'min'), ElemUID)
但是您不能在 spark_tbl 上使用 tidyr 中的 complete()
。
Error in UseMethod("complete_") :
no applicable method for 'complete_' applied to an object of class "c('tbl_spark', 'tbl_sql', 'tbl_lazy', 'tbl')"
这里是一些测试数据:
ElemUID ElemName Kind Number DaySecFrom(UTC) DaySecTo(UTC)
399126817 A648/13FKO-66 DEZ 2017-07-01 23:58:00.000 2017-07-01 23:59:00.000
483492732 A661/18FRS-97 DEZ 120.00 2017-07-01 23:58:00.000 2017-07-01 23:59:00.000
399126819 A648/12FKO-2 DEZ 60.00 2017-07-01 23:58:00.000 2017-07-01 23:59:00.000
399126818 A648/12FKO-1 DEZ 180.00 2017-07-01 23:58:00.000 2017-07-01 23:59:00.000
399126816 A648/13FKO-65 DEZ 2017-07-01 23:58:00.000 2017-07-01 23:59:00.000
398331142 A661/31OFN-1 DEZ 120.00 2017-07-01 23:58:00.000 2017-07-01 23:59:00.000
398331143 A661/31OFN-2 DEZ 2017-07-01 23:58:00.000 2017-07-01 23:59:00.000
483492739 A5/28FKN-65 DEZ 2017-07-01 23:58:00.000 2017-07-01 23:59:00.000
483492735 A661/23FRS-97 DEZ 60.00 2017-07-01 23:58:00.000 2017-07-01 23:59:00.000
是否有任何其他方法或变通方法可以在 R 中的 spark-cluster 上解决此任务? 如果能得到您的帮助,我将非常高兴!
找到作为纪元时间的最小值和最大值:
df <- copy_to(sc, tibble(id=1:4, timestamp=c(
"2017-07-01 23:49:00.000", "2017-07-01 23:50:00.000",
# 6 minutes gap
"2017-07-01 23:56:00.000",
# 1 minute gap
"2017-07-01 23:58:00.000")
), "df", overwrite=TRUE)
min_max <- df %>%
summarise(min(unix_timestamp(timestamp)), max(unix_timestamp(timestamp))) %>%
collect() %>%
unlist()
生成从 min(epoch_time)
到 max(epoch_time) + interval
的参考范围:
library(glue)
query <- glue("SELECT id AS timestamp FROM RANGE({min_max[1]}, {min_max[2] + 60}, 60)") %>%
as.character()
ref <- spark_session(sc) %>% invoke("sql", query) %>%
sdf_register() %>%
mutate(timestamp = from_unixtime(timestamp, "yyyy-MM-dd HH:mm:ss.SSS"))
外连接:
ref %>% left_join(df, by="timestamp")
# Source: lazy query [?? x 2]
# Database: spark_connection
timesptamp id
<chr> <int>
1 2017-07-01 23:49:00.000 1
2 2017-07-01 23:50:00.000 2
3 2017-07-01 23:51:00.000 NA
4 2017-07-01 23:52:00.000 NA
5 2017-07-01 23:53:00.000 NA
6 2017-07-01 23:54:00.000 NA
7 2017-07-01 23:55:00.000 NA
8 2017-07-01 23:56:00.000 3
9 2017-07-01 23:57:00.000 NA
10 2017-07-01 23:58:00.000 4
# ... with more rows
注:
如果您遇到与 SPARK-20145 相关的问题,您可以将 SQL 查询替换为:
spark_session(sc) %>%
invoke("range", as.integer(min_max[1]), as.integer(min_max[2]), 60L) %>%
sdf_register()