如何在 sparklyr 中使用 sdf_pivot() 并连接字符串?
How to use sdf_pivot() in sparklyr and concatenate strings?
我正在尝试使用 sparklyr 中的 sdf_pivot() 函数将长格式数据帧 "gather" 转换为宽格式。变量的值是我想连接的字符串。
这是一个我认为应该有效但无效的简单示例:
library(sparkylr)
d <- data.frame(id=c("1", "1", "2", "2", "1", "2"),
x=c("200", "200", "200", "201", "201", "201"),
y=c("This", "That", "The", "Other", "End", "End"))
d_sdf <- copy_to(sc, d, "d")
sdf_pivot(d_sdf, id ~ x, paste)
我希望它产生的是:
| id | `200` | `201` |
|====|=============|=================|
| 1 | "This That" | "End" |
| 2 | "The" | "Other End" |
不幸的是,这给了我一个错误说明:
Error in as.vector(x, "character") :
cannot coerce type 'environment' to vector of type 'character'
我也试过使用 "collect_list"
,结果出现了这个错误:
Error: java.lang.IllegalArgumentException: invalid method collect_list
for object 641
有没有办法做我想做的事情?
我深入研究了 sdf_pivot
的测试,看来您可以在自定义 fun.aggregate
函数中使用 invoke
来访问 collect_list
函数:
fun.aggregate <- function(gdf) {
expr <- invoke_static(
sc,
"org.apache.spark.sql.functions",
"expr",
"collect_list(y)" #this is your own "y" variable
)
gdf %>% invoke("agg", expr, list())
}
然后您可以在 sdf_pivot
:
中使用
d_sdf_wide <- sdf_pivot(d_sdf, id ~ x, fun.aggregate)
这样做确实有效:
> d_sdf_wide
Source: table<sparklyr_tmp_69c14424c5a4> [?? x 3]
Database: spark connection master=local[8] app=sparklyr local=TRUE
id `200` `201`
<chr> <list> <list>
1 1 <list [2]> <list [1]>
2 2 <list [1]> <list [2]>
(您的数据现在是 list
格式,不是字符串,但您可以根据需要连接列表,例如
d_sdf_wide %>% mutate(liststring = paste(`200`))
id `200` `201` liststring
<chr> <list> <list> <chr>
1 1 <list [2]> <list [1]> This That
2 2 <list [1]> <list [2]> The
(或者,你可以写一个复杂的 sql 查询,但我没试过)
我正在尝试使用 sparklyr 中的 sdf_pivot() 函数将长格式数据帧 "gather" 转换为宽格式。变量的值是我想连接的字符串。
这是一个我认为应该有效但无效的简单示例:
library(sparkylr)
d <- data.frame(id=c("1", "1", "2", "2", "1", "2"),
x=c("200", "200", "200", "201", "201", "201"),
y=c("This", "That", "The", "Other", "End", "End"))
d_sdf <- copy_to(sc, d, "d")
sdf_pivot(d_sdf, id ~ x, paste)
我希望它产生的是:
| id | `200` | `201` |
|====|=============|=================|
| 1 | "This That" | "End" |
| 2 | "The" | "Other End" |
不幸的是,这给了我一个错误说明:
Error in as.vector(x, "character") :
cannot coerce type 'environment' to vector of type 'character'
我也试过使用 "collect_list"
,结果出现了这个错误:
Error: java.lang.IllegalArgumentException: invalid method collect_list
for object 641
有没有办法做我想做的事情?
我深入研究了 sdf_pivot
的测试,看来您可以在自定义 fun.aggregate
函数中使用 invoke
来访问 collect_list
函数:
fun.aggregate <- function(gdf) {
expr <- invoke_static(
sc,
"org.apache.spark.sql.functions",
"expr",
"collect_list(y)" #this is your own "y" variable
)
gdf %>% invoke("agg", expr, list())
}
然后您可以在 sdf_pivot
:
d_sdf_wide <- sdf_pivot(d_sdf, id ~ x, fun.aggregate)
这样做确实有效:
> d_sdf_wide
Source: table<sparklyr_tmp_69c14424c5a4> [?? x 3]
Database: spark connection master=local[8] app=sparklyr local=TRUE
id `200` `201`
<chr> <list> <list>
1 1 <list [2]> <list [1]>
2 2 <list [1]> <list [2]>
(您的数据现在是 list
格式,不是字符串,但您可以根据需要连接列表,例如
d_sdf_wide %>% mutate(liststring = paste(`200`))
id `200` `201` liststring
<chr> <list> <list> <chr>
1 1 <list [2]> <list [1]> This That
2 2 <list [1]> <list [2]> The
(或者,你可以写一个复杂的 sql 查询,但我没试过)