在 Cassandra 中将宽格式 table 转换为长格式
Convert wide table to long format in Cassandra
不幸的是,客户给了我一个非常凌乱和非常大的 table (csv)。它是宽格式的:'(
例如,列是:
Name, Date, Usage_Hr1, Usage_Hr2, ..., Usage_Hr24, ... lots more columns
我通常只是将 .csv 加载到 R
并使用 tidyr
包中的 gather
,但是数据太大了。我考虑过将数据加载到 sparklyr
中,但是 sparklyr
中还没有 gather
函数...
所以我的问题是,一旦我 COPY
将我的 table 编辑到 Cassandra 中(将 PRIMARY KEY
设置为名称和日期),我如何将这些列转换为 long格式?我只是运气不好吗?顺便说一句,我不是数据库专家,所以我对此一无所知。
注意 我使用的是最新版本的 Cassandra,我当前的 table 大约是 1000 万行。
在 Spark 中你可以使用 explode
函数,,这样做 sparklyr
有点复杂。
初始化和示例数据:
library(stringi)
sc <- spark_connect("local[*]")
df <- data.frame(A = c("a", "b", "c"), B = c(1, 3, 5), C = c(2, 4, 6))
sdf <- copy_to(sc, df, overwrite =TRUE)
辅助函数:
#' Given name, return corresponding SQL function
sqlf <- function(f) function(x, ...) {
invoke_static(sc, "org.apache.spark.sql.functions", f, x, ...)
}
熔解函数:
#' @param df tbl_spark
#' @param sc spark_connection
#' @param id_vars id columns
#'
melt <- function(df, sc, id_vars, value_vars = NULL,
var_name = "key", value_name = "value") {
# Alias for the output view
alias <- paste(deparse(substitute(df)), stri_rand_strings(1, 10), sep = "_")
# Get session and JVM object
spark <- spark_session(sc)
jdf <- spark_dataframe(df)
# Convert characters to JVM Columns
j_id_vars <- lapply(id_vars, sqlf("col"))
# Combine columns into array<struct<key,value>> and explode
exploded <- sqlf("explode")(sqlf("array")(lapply(value_vars, function(x) {
key <- sqlf("lit")(x) %>% invoke("alias", var_name)
value <- sqlf("col")(x) %>% invoke("alias", value_name)
sqlf("struct")(list(key, value))
})))
# expand struct<..., struct<key, value>> into struct<..., key, value>
exprs <- lapply(
c(id_vars, paste("col", c(var_name, value_name), sep = ".")),
sqlf("col"))
# Explode and register as temp table
jdf %>%
invoke("withColumn", "col", exploded) %>%
invoke("select", exprs) %>%
invoke("createOrReplaceTempView", alias)
dplyr::tbl(sc, alias)
}
用法示例:
melt(sdf, sc, "A", c("B", "C"))
## Source: query [6 x 3]
## Database: spark connection master=local[*] app=sparklyr local=TRUE
##
## # A tibble: 6 x 3
## A key value
## <chr> <chr> <dbl>
## 1 a B 1
## 2 a C 2
## 3 b B 3
## 4 b C 4
## 5 c B 5
## 6 c C 6
不幸的是,客户给了我一个非常凌乱和非常大的 table (csv)。它是宽格式的:'(
例如,列是:
Name, Date, Usage_Hr1, Usage_Hr2, ..., Usage_Hr24, ... lots more columns
我通常只是将 .csv 加载到 R
并使用 tidyr
包中的 gather
,但是数据太大了。我考虑过将数据加载到 sparklyr
中,但是 sparklyr
中还没有 gather
函数...
所以我的问题是,一旦我 COPY
将我的 table 编辑到 Cassandra 中(将 PRIMARY KEY
设置为名称和日期),我如何将这些列转换为 long格式?我只是运气不好吗?顺便说一句,我不是数据库专家,所以我对此一无所知。
注意 我使用的是最新版本的 Cassandra,我当前的 table 大约是 1000 万行。
在 Spark 中你可以使用 explode
函数,sparklyr
有点复杂。
初始化和示例数据:
library(stringi)
sc <- spark_connect("local[*]")
df <- data.frame(A = c("a", "b", "c"), B = c(1, 3, 5), C = c(2, 4, 6))
sdf <- copy_to(sc, df, overwrite =TRUE)
辅助函数:
#' Given name, return corresponding SQL function
sqlf <- function(f) function(x, ...) {
invoke_static(sc, "org.apache.spark.sql.functions", f, x, ...)
}
熔解函数:
#' @param df tbl_spark
#' @param sc spark_connection
#' @param id_vars id columns
#'
melt <- function(df, sc, id_vars, value_vars = NULL,
var_name = "key", value_name = "value") {
# Alias for the output view
alias <- paste(deparse(substitute(df)), stri_rand_strings(1, 10), sep = "_")
# Get session and JVM object
spark <- spark_session(sc)
jdf <- spark_dataframe(df)
# Convert characters to JVM Columns
j_id_vars <- lapply(id_vars, sqlf("col"))
# Combine columns into array<struct<key,value>> and explode
exploded <- sqlf("explode")(sqlf("array")(lapply(value_vars, function(x) {
key <- sqlf("lit")(x) %>% invoke("alias", var_name)
value <- sqlf("col")(x) %>% invoke("alias", value_name)
sqlf("struct")(list(key, value))
})))
# expand struct<..., struct<key, value>> into struct<..., key, value>
exprs <- lapply(
c(id_vars, paste("col", c(var_name, value_name), sep = ".")),
sqlf("col"))
# Explode and register as temp table
jdf %>%
invoke("withColumn", "col", exploded) %>%
invoke("select", exprs) %>%
invoke("createOrReplaceTempView", alias)
dplyr::tbl(sc, alias)
}
用法示例:
melt(sdf, sc, "A", c("B", "C"))
## Source: query [6 x 3]
## Database: spark connection master=local[*] app=sparklyr local=TRUE
##
## # A tibble: 6 x 3
## A key value
## <chr> <chr> <dbl>
## 1 a B 1
## 2 a C 2
## 3 b B 3
## 4 b C 4
## 5 c B 5
## 6 c C 6