使用 sdf_pivot() 函数旋转后如何获取列值(dcast 中的 value.var)
How to get column values (value.var in dcast) after pivoting using sdf_pivot() function
我正在尝试使用 sdf_pivot()
函数 dcast 我的 spark 数据帧。我要
显示 reshape2 包中 dcast()
中的 value.var 参数等列的值。请看下面的例子。
id <- c(1,1,1,1,1,2,2,2,3,3,3)
name <- c("A","B","C","D","E","A","B","C","D","E","F")
value <- c(1,2,3,1,1,2,3,1,1,2,3)
dt <- data.frame(id,name,value)
reshape2::dcast(dt,id~name,value.var = "value")
output1-
id A B C D E F
1 1 1 2 3 1 1 NA
2 2 2 3 1 NA NA NA
3 3 NA NA NA 1 2 3
spark_dt <- copy_to(sc, dt)
sdf_pivot(spark_dt,id~name)
output2-
id A B C D E F
<dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl>
1 1 1 1 1 1 1 NaN
2 3 NaN NaN NaN 1 1 1
3 2 1 1 1 NaN NaN NaN
sdf_pivot()
函数中似乎没有 value.var 参数。
我是 spark 的新手,如有任何建议,我们将不胜感激。
我需要编写自定义函数吗?
注意**-
我试过了
##Pivoting
cohort_paste <- function(gdf) {
expr <- invoke_static(
sc,
"org.apache.spark.sql.functions",
"paste",
"value"
)
gdf %>% invoke("agg", expr, list())
}
报错
Error: java.lang.IllegalArgumentException: invalid method paste for
object org.apache.spark.sql.functions
我真的想使用paste
功能。
尝试使用数值列
df <- tibble(
id = c(rep(1, 9), rep(2, 9)),
name = rep(rep(c("A", "B", "C"), each=3), 2),
value = sample(10,18,replace=T)
)[sample(1:18, size=10), ]
spark_dt <- copy_to(sc, df, overwrite=TRUE)
collect_list <- function(gdf) {
expr <- invoke_static(
sc,
"org.apache.spark.sql.functions",
"collect_list",
"value"
)
gdf %>% invoke("agg", expr, list())
}
sdf_pivot(spark_dt, id ~ name, fun.aggregate=collect_list) %>%
mutate_at(vars(-id), funs(concat_ws(" ", .)))
错误日志-
Error: org.apache.spark.sql.AnalysisException: cannot resolve
'concat_ws(' ', sparklyr_tmp_79e15abf584.A
)' due to data type
mismatch: argument 2 requires (array or string) type, however,
'sparklyr_tmp_79e15abf584.A
' is of array type.; line 1 pos 13;
'GlobalLimit 10
+- 'LocalLimit 10 +- 'Project [id#3038, concat_ws( , A#3156) AS A#3172, concat_ws( , B#3158) AS B#3173, concat_ws( , C#3160) AS
C#3174]
+- SubqueryAlias sparklyr_tmp_79e15abf584
+- Aggregate [id#3038], [id#3038, collect_list(if ((name#3039 = A)) value#3040 else cast(null as int), 0, 0) AS A#3156, collect_list(if ((name#3039 = B)) value#3040 else cast(null as int),
0, 0) AS B#3158, collect_list(if ((name#3039 = C)) value#3040 else
cast(null as int), 0, 0) AS C#3160]
+- Project [id#3038, name#3039, value#3040]
+- SubqueryAlias df
+- Relation[id#3038,name#3039,value#3040] csv
这失败了,因为 paste 不是 Spark 函数,您不能在此上下文中执行 R 代码。
您可以尝试这样的操作:
library(dplyr)
library(sparklyr)
sc <- spark_connect("local[8]")
set.seed(1)
df <- tibble(
id = c(rep(1, 9), rep(2, 9)),
name = rep(rep(c("A", "B", "C"), each=3), 2),
value = sample(letters, size=18)
)[sample(1:18, size=10), ]
spark_dt <- copy_to(sc, df, overwrite=TRUE)
collect_list <- function(gdf) {
expr <- invoke_static(
sc,
"org.apache.spark.sql.functions",
"collect_list",
"value"
)
gdf %>% invoke("agg", expr, list())
}
sdf_pivot(spark_dt, id ~ name, fun.aggregate=collect_list) %>%
mutate_at(vars(-id), funs(concat_ws(" ", .)))
# # Source: lazy query [?? x 4]
# # Database: spark_connection
# id A B C
# <dbl> <chr> <chr> <chr>
# 1 1.00 j g u e w
# 2 2.00 b c v x f
您还可以使用 window 函数:
first <- function(gdf) {
expr <- invoke_static(
sc,
"org.apache.spark.sql.functions",
"first",
"value"
)
gdf %>% invoke("agg", expr, list())
}
spark_dt %>%
group_by(id, name) %>%
arrange(value) %>%
mutate(i = row_number()) %>%
mutate(name = concat_ws("_", name, i)) %>%
select(-i) %>% sdf_pivot(id ~ name, first)
# # Source: table<sparklyr_tmp_1ba404d8f51> [?? x 8]
# # Database: spark_connection
# id A_1 A_2 A_3 B_1 B_2 B_3 C_1
# <dbl> <chr> <chr> <chr> <chr> <chr> <chr> <chr>
# 1 1.00 m NA NA f n v d
# 2 2.00 b x y h r NA NA
我正在尝试使用 sdf_pivot()
函数 dcast 我的 spark 数据帧。我要
显示 reshape2 包中 dcast()
中的 value.var 参数等列的值。请看下面的例子。
id <- c(1,1,1,1,1,2,2,2,3,3,3)
name <- c("A","B","C","D","E","A","B","C","D","E","F")
value <- c(1,2,3,1,1,2,3,1,1,2,3)
dt <- data.frame(id,name,value)
reshape2::dcast(dt,id~name,value.var = "value")
output1-
id A B C D E F
1 1 1 2 3 1 1 NA
2 2 2 3 1 NA NA NA
3 3 NA NA NA 1 2 3
spark_dt <- copy_to(sc, dt)
sdf_pivot(spark_dt,id~name)
output2-
id A B C D E F
<dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl>
1 1 1 1 1 1 1 NaN
2 3 NaN NaN NaN 1 1 1
3 2 1 1 1 NaN NaN NaN
sdf_pivot()
函数中似乎没有 value.var 参数。
我是 spark 的新手,如有任何建议,我们将不胜感激。
我需要编写自定义函数吗?
注意**- 我试过了
##Pivoting
cohort_paste <- function(gdf) {
expr <- invoke_static(
sc,
"org.apache.spark.sql.functions",
"paste",
"value"
)
gdf %>% invoke("agg", expr, list())
}
报错
Error: java.lang.IllegalArgumentException: invalid method paste for object org.apache.spark.sql.functions
我真的想使用paste
功能。
df <- tibble(
id = c(rep(1, 9), rep(2, 9)),
name = rep(rep(c("A", "B", "C"), each=3), 2),
value = sample(10,18,replace=T)
)[sample(1:18, size=10), ]
spark_dt <- copy_to(sc, df, overwrite=TRUE)
collect_list <- function(gdf) {
expr <- invoke_static(
sc,
"org.apache.spark.sql.functions",
"collect_list",
"value"
)
gdf %>% invoke("agg", expr, list())
}
sdf_pivot(spark_dt, id ~ name, fun.aggregate=collect_list) %>%
mutate_at(vars(-id), funs(concat_ws(" ", .)))
错误日志-
Error: org.apache.spark.sql.AnalysisException: cannot resolve 'concat_ws(' ', sparklyr_tmp_79e15abf584.
A
)' due to data type mismatch: argument 2 requires (array or string) type, however, 'sparklyr_tmp_79e15abf584.A
' is of array type.; line 1 pos 13; 'GlobalLimit 10 +- 'LocalLimit 10 +- 'Project [id#3038, concat_ws( , A#3156) AS A#3172, concat_ws( , B#3158) AS B#3173, concat_ws( , C#3160) AS C#3174] +- SubqueryAlias sparklyr_tmp_79e15abf584 +- Aggregate [id#3038], [id#3038, collect_list(if ((name#3039 = A)) value#3040 else cast(null as int), 0, 0) AS A#3156, collect_list(if ((name#3039 = B)) value#3040 else cast(null as int), 0, 0) AS B#3158, collect_list(if ((name#3039 = C)) value#3040 else cast(null as int), 0, 0) AS C#3160] +- Project [id#3038, name#3039, value#3040] +- SubqueryAlias df +- Relation[id#3038,name#3039,value#3040] csv
这失败了,因为 paste 不是 Spark 函数,您不能在此上下文中执行 R 代码。
您可以尝试这样的操作:
library(dplyr)
library(sparklyr)
sc <- spark_connect("local[8]")
set.seed(1)
df <- tibble(
id = c(rep(1, 9), rep(2, 9)),
name = rep(rep(c("A", "B", "C"), each=3), 2),
value = sample(letters, size=18)
)[sample(1:18, size=10), ]
spark_dt <- copy_to(sc, df, overwrite=TRUE)
collect_list <- function(gdf) {
expr <- invoke_static(
sc,
"org.apache.spark.sql.functions",
"collect_list",
"value"
)
gdf %>% invoke("agg", expr, list())
}
sdf_pivot(spark_dt, id ~ name, fun.aggregate=collect_list) %>%
mutate_at(vars(-id), funs(concat_ws(" ", .)))
# # Source: lazy query [?? x 4]
# # Database: spark_connection
# id A B C
# <dbl> <chr> <chr> <chr>
# 1 1.00 j g u e w
# 2 2.00 b c v x f
您还可以使用 window 函数:
first <- function(gdf) {
expr <- invoke_static(
sc,
"org.apache.spark.sql.functions",
"first",
"value"
)
gdf %>% invoke("agg", expr, list())
}
spark_dt %>%
group_by(id, name) %>%
arrange(value) %>%
mutate(i = row_number()) %>%
mutate(name = concat_ws("_", name, i)) %>%
select(-i) %>% sdf_pivot(id ~ name, first)
# # Source: table<sparklyr_tmp_1ba404d8f51> [?? x 8]
# # Database: spark_connection
# id A_1 A_2 A_3 B_1 B_2 B_3 C_1
# <dbl> <chr> <chr> <chr> <chr> <chr> <chr> <chr>
# 1 1.00 m NA NA f n v d
# 2 2.00 b x y h r NA NA