使用 sdf_pivot() 函数旋转后如何获取列值(dcast 中的 value.var)

How to get column values (value.var in dcast) after pivoting using sdf_pivot() function

我正在尝试使用 sdf_pivot() 函数 dcast 我的 spark 数据帧。我要
显示 reshape2 包中 dcast() 中的 value.var 参数等列的值。请看下面的例子。

id <- c(1,1,1,1,1,2,2,2,3,3,3)
name <- c("A","B","C","D","E","A","B","C","D","E","F")
value <- c(1,2,3,1,1,2,3,1,1,2,3)
dt <- data.frame(id,name,value)
reshape2::dcast(dt,id~name,value.var = "value")

output1-

  id  A  B  C  D  E  F
1  1  1  2  3  1  1 NA
2  2  2  3  1 NA NA NA
3  3 NA NA NA  1  2  3

spark_dt <- copy_to(sc, dt)
sdf_pivot(spark_dt,id~name)

output2-

id     A     B     C     D     E     F
  <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl>
1     1     1     1     1     1     1   NaN
2     3   NaN   NaN   NaN     1     1     1
3     2     1     1     1   NaN   NaN   NaN

sdf_pivot() 函数中似乎没有 value.var 参数。 我是 spark 的新手,如有任何建议,我们将不胜感激。 我需要编写自定义函数吗?

注意**- 我试过了

##Pivoting
cohort_paste <- function(gdf) {
  expr <- invoke_static(
    sc,
    "org.apache.spark.sql.functions",
    "paste",
    "value"
  )
  gdf %>% invoke("agg", expr, list())
}

报错

Error: java.lang.IllegalArgumentException: invalid method paste for object org.apache.spark.sql.functions

我真的想使用paste功能。

尝试使用数值列
df <- tibble(
    id = c(rep(1, 9), rep(2, 9)),
    name = rep(rep(c("A", "B", "C"), each=3), 2),
    value = sample(10,18,replace=T)
)[sample(1:18, size=10), ]

spark_dt <- copy_to(sc, df, overwrite=TRUE)

collect_list <- function(gdf) {
    expr <- invoke_static(
        sc,
        "org.apache.spark.sql.functions",
        "collect_list",
        "value"
    )
    gdf %>% invoke("agg", expr, list())
}

sdf_pivot(spark_dt, id ~ name, fun.aggregate=collect_list) %>% 
    mutate_at(vars(-id), funs(concat_ws(" ", .)))

错误日志-

Error: org.apache.spark.sql.AnalysisException: cannot resolve 'concat_ws(' ', sparklyr_tmp_79e15abf584.A)' due to data type mismatch: argument 2 requires (array or string) type, however, 'sparklyr_tmp_79e15abf584.A' is of array type.; line 1 pos 13; 'GlobalLimit 10 +- 'LocalLimit 10 +- 'Project [id#3038, concat_ws( , A#3156) AS A#3172, concat_ws( , B#3158) AS B#3173, concat_ws( , C#3160) AS C#3174] +- SubqueryAlias sparklyr_tmp_79e15abf584 +- Aggregate [id#3038], [id#3038, collect_list(if ((name#3039 = A)) value#3040 else cast(null as int), 0, 0) AS A#3156, collect_list(if ((name#3039 = B)) value#3040 else cast(null as int), 0, 0) AS B#3158, collect_list(if ((name#3039 = C)) value#3040 else cast(null as int), 0, 0) AS C#3160] +- Project [id#3038, name#3039, value#3040] +- SubqueryAlias df +- Relation[id#3038,name#3039,value#3040] csv

这失败了,因为 paste 不是 Spark 函数,您不能在此上下文中执行 R 代码。

您可以尝试这样的操作:

library(dplyr)
library(sparklyr)

sc <- spark_connect("local[8]")
set.seed(1)

df <- tibble(
  id = c(rep(1, 9), rep(2, 9)),
  name = rep(rep(c("A", "B", "C"), each=3), 2),
  value = sample(letters, size=18)
)[sample(1:18, size=10), ]

spark_dt <- copy_to(sc, df, overwrite=TRUE)

collect_list <- function(gdf) {
  expr <- invoke_static(
    sc,
    "org.apache.spark.sql.functions",
    "collect_list",
    "value"
  )
  gdf %>% invoke("agg", expr, list())
}

sdf_pivot(spark_dt, id ~ name, fun.aggregate=collect_list) %>% 
  mutate_at(vars(-id), funs(concat_ws(" ", .)))

#  # Source:   lazy query [?? x 4]
#  # Database: spark_connection
#       id A     B     C    
#    <dbl> <chr> <chr> <chr>
#  1  1.00 j g   u e   w    
#  2  2.00 b c   v x   f  

您还可以使用 window 函数:

first <- function(gdf) {
  expr <- invoke_static(
    sc,
    "org.apache.spark.sql.functions",
    "first",
    "value"
  )
  gdf %>% invoke("agg", expr, list())
}


spark_dt %>% 
  group_by(id, name) %>% 
  arrange(value) %>% 
  mutate(i = row_number()) %>% 
  mutate(name = concat_ws("_", name,  i)) %>% 
  select(-i) %>% sdf_pivot(id ~ name, first)

# # Source:   table<sparklyr_tmp_1ba404d8f51> [?? x 8]
# # Database: spark_connection
#      id A_1   A_2   A_3   B_1   B_2   B_3   C_1  
#   <dbl> <chr> <chr> <chr> <chr> <chr> <chr> <chr>
# 1  1.00 m     NA    NA    f     n     v     d    
# 2  2.00 b     x     y     h     r     NA    NA