Sparklyr/Dplyr - 如何为 sparkdata 帧的每一行应用用户定义的函数并将每一行的输出写入新列?
Sparklyr/Dplyr - How to apply a user defined function for each row of a sparkdata frame and create write the output of each row to new column?
我有一个 spark_tbl 包含 160 多列。
我将举例说明数据框的外观:
Key A B C D E F G .....Z
s1 0 1 0 1 1 0 1 0
s2 1 0 0 0 0 0 0 0
s3 1 1 0 0 0 0 0 0
s4 0 1 0 1 1 0 0 0
我想要实现的是根据每列的值创建一个新列,
Key A B C D E F G .....Z panel
s1 0 1 0 1 1 0 1 0 B,D,E,G
s2 1 0 0 0 0 0 0 0 A
s3 1 1 0 0 0 0 0 0 A,B
s4 0 1 0 1 1 0 0 0 B,D,E
按行检查每一列,如果值为 1,则将列名添加到字符串中,最后将其写入名为 panel 的列。
我尝试编写用户定义的函数:
get_panel <- function(eachrow){
id <- ""
row_list <- as.list(eachrow)
for (i in 1:length(row_list)){
if(row_list[i] == "1"){
if(id == ""){
id = columns[i+1]
}else{
id = paste(id, ",", columns[i+1])
}
}
}
return(id)
}
这适用于使用应用功能的常规数据框。
但是,
如何将此函数应用于 Spark Dataframe 或 tbl_spark?
查看此 Scala 解决方案。
scala> val df = Seq(("s1",0,1,0,1,1,0,1),
| ("s2",1,0,0,0,0,0,0),
| ("s3",1,1,0,0,0,0,0),
| ("s4",0,1,0,1,1,0,0)).toDF("key","A","B","C","D","E","F","G")
df: org.apache.spark.sql.DataFrame = [key: string, A: int ... 6 more fields]
scala> df.show
+---+---+---+---+---+---+---+---+
|key| A| B| C| D| E| F| G|
+---+---+---+---+---+---+---+---+
| s1| 0| 1| 0| 1| 1| 0| 1|
| s2| 1| 0| 0| 0| 0| 0| 0|
| s3| 1| 1| 0| 0| 0| 0| 0|
| s4| 0| 1| 0| 1| 1| 0| 0|
+---+---+---+---+---+---+---+---+
scala> val columns = df.columns.filter(x=>x != "key")
columns: Array[String] = Array(A, B, C, D, E, F, G)
scala> val p1 = columns.map( x => when(col(x)===lit(1),x+",").otherwise(lit(""))).reduce(concat(_,_)).as("panel")
p1: org.apache.spark.sql.Column = concat(concat(concat(concat(concat(concat(CASE WHEN (A = 1) THEN A, ELSE END, CASE WHEN (B = 1) THEN B, ELSE END), CASE WHEN (C = 1) THEN C, ELSE END), CASE WHEN (D = 1) THEN D, ELSE END), CASE WHEN (E = 1) THEN E, ELSE END), CASE WHEN (F = 1) THEN F, ELSE END), CASE WHEN (G = 1) THEN G, ELSE END) AS `panel`
scala> df.select(p1).show(false)
+--------+
|panel |
+--------+
|B,D,E,G,|
|A, |
|A,B, |
|B,D,E, |
+--------+
对于所有列,
scala> df.select(col("*"), p1).show
+---+---+---+---+---+---+---+---+--------+
|key| A| B| C| D| E| F| G| panel|
+---+---+---+---+---+---+---+---+--------+
| s1| 0| 1| 0| 1| 1| 0| 1|B,D,E,G,|
| s2| 1| 0| 0| 0| 0| 0| 0| A,|
| s3| 1| 1| 0| 0| 0| 0| 0| A,B,|
| s4| 0| 1| 0| 1| 1| 0| 0| B,D,E,|
+---+---+---+---+---+---+---+---+--------+
结果中有尾随逗号。可以通过
删除
scala> df.select(col("*"), regexp_replace(p1,",$","").as("panel")).show
+---+---+---+---+---+---+---+---+-------+
|key| A| B| C| D| E| F| G| panel|
+---+---+---+---+---+---+---+---+-------+
| s1| 0| 1| 0| 1| 1| 0| 1|B,D,E,G|
| s2| 1| 0| 0| 0| 0| 0| 0| A|
| s3| 1| 1| 0| 0| 0| 0| 0| A,B|
| s4| 0| 1| 0| 1| 1| 0| 0| B,D,E|
+---+---+---+---+---+---+---+---+-------+
scala>
编辑2:
A more cleaner approach would be to use just array() function with concat_ws
scala> val df = Seq(("s1",0,1,0,1,1,0,1),("s2",1,0,0,0,0,0,0),("s3",1,1,0,0,0,0,0),("s4",0,1,0,1,1,0,0)).toDF("key","A","B","C","D","E","F","G")
df: org.apache.spark.sql.DataFrame = [key: string, A: int ... 6 more fields]
scala> df.show(false)
+---+---+---+---+---+---+---+---+
|key|A |B |C |D |E |F |G |
+---+---+---+---+---+---+---+---+
|s1 |0 |1 |0 |1 |1 |0 |1 |
|s2 |1 |0 |0 |0 |0 |0 |0 |
|s3 |1 |1 |0 |0 |0 |0 |0 |
|s4 |0 |1 |0 |1 |1 |0 |0 |
+---+---+---+---+---+---+---+---+
scala> val p1 = columns.map( x => when(col(x)===lit(1),x).otherwise(null))
p1: Array[org.apache.spark.sql.Column] = Array(CASE WHEN (A = 1) THEN A ELSE NULL END, CASE WHEN (B = 1) THEN B ELSE NULL END, CASE WHEN (C = 1) THEN C ELSE NULL END, CASE WHEN (D = 1) THEN D ELSE NULL END, CASE WHEN (E = 1) THEN E ELSE NULL END, CASE WHEN (F = 1) THEN F ELSE NULL END, CASE WHEN (G = 1) THEN G ELSE NULL END)
scala> df.select(col("*"),array(p1:_*).alias("panel")).withColumn("panel2",concat_ws(",",'panel)).show(false)
+---+---+---+---+---+---+---+---+----------------+-------+
|key|A |B |C |D |E |F |G |panel |panel2 |
+---+---+---+---+---+---+---+---+----------------+-------+
|s1 |0 |1 |0 |1 |1 |0 |1 |[, B,, D, E,, G]|B,D,E,G|
|s2 |1 |0 |0 |0 |0 |0 |0 |[A,,,,,,] |A |
|s3 |1 |1 |0 |0 |0 |0 |0 |[A, B,,,,,] |A,B |
|s4 |0 |1 |0 |1 |1 |0 |0 |[, B,, D, E,,] |B,D,E |
+---+---+---+---+---+---+---+---+----------------+-------+
scala>
不确定这是否会将 100% 转换为 sparklyr
,但您可以使用 sdf_nest
:
library(tidyverse)
mat <- matrix(c(paste0("s", 1:4), as.numeric(sample(0:1, 4 * 26, TRUE))), ncol = 27)
colnames(mat) <- c("Key", LETTERS[1:26])
df <- data.frame(mat, stringsAsFactors = FALSE) %>%
mutate_at(vars(-"Key"), as.numeric) %>%
as_data_frame()
df
#> # A tibble: 4 x 27
#> Key A B C D E F G H I J K
#> <chr> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl>
#> 1 s1 0 1 1 1 1 0 0 0 0 1 1
#> 2 s2 0 1 0 1 0 1 1 1 1 0 0
#> 3 s3 0 1 1 1 1 0 0 0 0 1 1
#> 4 s4 0 0 0 1 0 0 0 1 1 0 1
#> # ... with 15 more variables: L <dbl>, M <dbl>, N <dbl>, O <dbl>, P <dbl>,
#> # Q <dbl>, R <dbl>, S <dbl>, T <dbl>, U <dbl>, V <dbl>, W <dbl>,
#> # X <dbl>, Y <dbl>, Z <dbl>
df %>%
group_by(Key) %>%
nest() %>%
mutate(panel = map_chr(data, ~ unlist(.) %>% as.logical %>% names(df)[-1][.] %>% paste(collapse = ",")))
#> # A tibble: 4 x 3
#> Key data panel
#> <chr> <list> <chr>
#> 1 s1 <tibble [1 x 26]> B,C,D,E,J,K,L,M,N,O,P,Q,R,W,Y,Z
#> 2 s2 <tibble [1 x 26]> B,D,F,G,H,I,N,R,S,T,V,W,X,Z
#> 3 s3 <tibble [1 x 26]> B,C,D,E,J,K,M,N,O,Q,R,S,T,V,X,Y
#> 4 s4 <tibble [1 x 26]> D,H,I,K,L,O,P,T,U,V,W,Z
我认为@JasonAizkalns 走在正确的轨道上。从他的例子开始:
library(dplyr)
library(sparklyr)
sc <- spark_connect(master = "local")
mat <- matrix(c(paste0("s", 1:4), as.numeric(sample(0:1, 4 * 26, TRUE))), ncol = 27)
colnames(mat) <- c("Key", LETTERS[1:26])
df <- data.frame(mat, stringsAsFactors = FALSE) %>%
mutate_at(vars(-"Key"), as.numeric) %>%
as_data_frame()
df
dfs <- copy_to(sc, df, overwrite = TRUE)
我们可以使用一点 rlang
魔法到达那里。
dfs <- dfs %>% mutate(panel = "")
for (letter in LETTERS[1:26]) {
dfs <- dfs %>% mutate(panel = concat_ws(",", panel, ifelse(!!sym(letter) == 1.0, yes = letter, no = NA)))
}
dfs %>%
mutate(panel = regexp_replace(panel, "^,", "")) %>% # remove leading comma
select(Key, A:D, panel)
提供我认为您想要的
# Source: spark<?> [?? x 6]
Key A B C D panel
* <chr> <dbl> <dbl> <dbl> <dbl> <chr>
1 s1 0 0 1 1 C,D,E,G,O,P,Q,U,Z
2 s2 1 0 0 1 A,D,G,K,L,M,N,Q,S,U,W
3 s3 0 1 0 0 B,E,L,M,O,Q,R,S,T,Y
4 s4 1 1 0 1 A,B,D,E,G,I,J,M,N,R,S,T,U,V,Y,Z
这里的关键是 concat_ws
Spark SQL(不是 R)函数。参见 https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/functions.html#concat_ws-java.lang.String-org.apache.spark.sql.Column...-
我有一个 spark_tbl 包含 160 多列。
我将举例说明数据框的外观:
Key A B C D E F G .....Z
s1 0 1 0 1 1 0 1 0
s2 1 0 0 0 0 0 0 0
s3 1 1 0 0 0 0 0 0
s4 0 1 0 1 1 0 0 0
我想要实现的是根据每列的值创建一个新列,
Key A B C D E F G .....Z panel
s1 0 1 0 1 1 0 1 0 B,D,E,G
s2 1 0 0 0 0 0 0 0 A
s3 1 1 0 0 0 0 0 0 A,B
s4 0 1 0 1 1 0 0 0 B,D,E
按行检查每一列,如果值为 1,则将列名添加到字符串中,最后将其写入名为 panel 的列。
我尝试编写用户定义的函数:
get_panel <- function(eachrow){
id <- ""
row_list <- as.list(eachrow)
for (i in 1:length(row_list)){
if(row_list[i] == "1"){
if(id == ""){
id = columns[i+1]
}else{
id = paste(id, ",", columns[i+1])
}
}
}
return(id)
}
这适用于使用应用功能的常规数据框。 但是,
如何将此函数应用于 Spark Dataframe 或 tbl_spark?
查看此 Scala 解决方案。
scala> val df = Seq(("s1",0,1,0,1,1,0,1),
| ("s2",1,0,0,0,0,0,0),
| ("s3",1,1,0,0,0,0,0),
| ("s4",0,1,0,1,1,0,0)).toDF("key","A","B","C","D","E","F","G")
df: org.apache.spark.sql.DataFrame = [key: string, A: int ... 6 more fields]
scala> df.show
+---+---+---+---+---+---+---+---+
|key| A| B| C| D| E| F| G|
+---+---+---+---+---+---+---+---+
| s1| 0| 1| 0| 1| 1| 0| 1|
| s2| 1| 0| 0| 0| 0| 0| 0|
| s3| 1| 1| 0| 0| 0| 0| 0|
| s4| 0| 1| 0| 1| 1| 0| 0|
+---+---+---+---+---+---+---+---+
scala> val columns = df.columns.filter(x=>x != "key")
columns: Array[String] = Array(A, B, C, D, E, F, G)
scala> val p1 = columns.map( x => when(col(x)===lit(1),x+",").otherwise(lit(""))).reduce(concat(_,_)).as("panel")
p1: org.apache.spark.sql.Column = concat(concat(concat(concat(concat(concat(CASE WHEN (A = 1) THEN A, ELSE END, CASE WHEN (B = 1) THEN B, ELSE END), CASE WHEN (C = 1) THEN C, ELSE END), CASE WHEN (D = 1) THEN D, ELSE END), CASE WHEN (E = 1) THEN E, ELSE END), CASE WHEN (F = 1) THEN F, ELSE END), CASE WHEN (G = 1) THEN G, ELSE END) AS `panel`
scala> df.select(p1).show(false)
+--------+
|panel |
+--------+
|B,D,E,G,|
|A, |
|A,B, |
|B,D,E, |
+--------+
对于所有列,
scala> df.select(col("*"), p1).show
+---+---+---+---+---+---+---+---+--------+
|key| A| B| C| D| E| F| G| panel|
+---+---+---+---+---+---+---+---+--------+
| s1| 0| 1| 0| 1| 1| 0| 1|B,D,E,G,|
| s2| 1| 0| 0| 0| 0| 0| 0| A,|
| s3| 1| 1| 0| 0| 0| 0| 0| A,B,|
| s4| 0| 1| 0| 1| 1| 0| 0| B,D,E,|
+---+---+---+---+---+---+---+---+--------+
结果中有尾随逗号。可以通过
删除scala> df.select(col("*"), regexp_replace(p1,",$","").as("panel")).show
+---+---+---+---+---+---+---+---+-------+
|key| A| B| C| D| E| F| G| panel|
+---+---+---+---+---+---+---+---+-------+
| s1| 0| 1| 0| 1| 1| 0| 1|B,D,E,G|
| s2| 1| 0| 0| 0| 0| 0| 0| A|
| s3| 1| 1| 0| 0| 0| 0| 0| A,B|
| s4| 0| 1| 0| 1| 1| 0| 0| B,D,E|
+---+---+---+---+---+---+---+---+-------+
scala>
编辑2:
A more cleaner approach would be to use just array() function with concat_ws
scala> val df = Seq(("s1",0,1,0,1,1,0,1),("s2",1,0,0,0,0,0,0),("s3",1,1,0,0,0,0,0),("s4",0,1,0,1,1,0,0)).toDF("key","A","B","C","D","E","F","G")
df: org.apache.spark.sql.DataFrame = [key: string, A: int ... 6 more fields]
scala> df.show(false)
+---+---+---+---+---+---+---+---+
|key|A |B |C |D |E |F |G |
+---+---+---+---+---+---+---+---+
|s1 |0 |1 |0 |1 |1 |0 |1 |
|s2 |1 |0 |0 |0 |0 |0 |0 |
|s3 |1 |1 |0 |0 |0 |0 |0 |
|s4 |0 |1 |0 |1 |1 |0 |0 |
+---+---+---+---+---+---+---+---+
scala> val p1 = columns.map( x => when(col(x)===lit(1),x).otherwise(null))
p1: Array[org.apache.spark.sql.Column] = Array(CASE WHEN (A = 1) THEN A ELSE NULL END, CASE WHEN (B = 1) THEN B ELSE NULL END, CASE WHEN (C = 1) THEN C ELSE NULL END, CASE WHEN (D = 1) THEN D ELSE NULL END, CASE WHEN (E = 1) THEN E ELSE NULL END, CASE WHEN (F = 1) THEN F ELSE NULL END, CASE WHEN (G = 1) THEN G ELSE NULL END)
scala> df.select(col("*"),array(p1:_*).alias("panel")).withColumn("panel2",concat_ws(",",'panel)).show(false)
+---+---+---+---+---+---+---+---+----------------+-------+
|key|A |B |C |D |E |F |G |panel |panel2 |
+---+---+---+---+---+---+---+---+----------------+-------+
|s1 |0 |1 |0 |1 |1 |0 |1 |[, B,, D, E,, G]|B,D,E,G|
|s2 |1 |0 |0 |0 |0 |0 |0 |[A,,,,,,] |A |
|s3 |1 |1 |0 |0 |0 |0 |0 |[A, B,,,,,] |A,B |
|s4 |0 |1 |0 |1 |1 |0 |0 |[, B,, D, E,,] |B,D,E |
+---+---+---+---+---+---+---+---+----------------+-------+
scala>
不确定这是否会将 100% 转换为 sparklyr
,但您可以使用 sdf_nest
:
library(tidyverse)
mat <- matrix(c(paste0("s", 1:4), as.numeric(sample(0:1, 4 * 26, TRUE))), ncol = 27)
colnames(mat) <- c("Key", LETTERS[1:26])
df <- data.frame(mat, stringsAsFactors = FALSE) %>%
mutate_at(vars(-"Key"), as.numeric) %>%
as_data_frame()
df
#> # A tibble: 4 x 27
#> Key A B C D E F G H I J K
#> <chr> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl>
#> 1 s1 0 1 1 1 1 0 0 0 0 1 1
#> 2 s2 0 1 0 1 0 1 1 1 1 0 0
#> 3 s3 0 1 1 1 1 0 0 0 0 1 1
#> 4 s4 0 0 0 1 0 0 0 1 1 0 1
#> # ... with 15 more variables: L <dbl>, M <dbl>, N <dbl>, O <dbl>, P <dbl>,
#> # Q <dbl>, R <dbl>, S <dbl>, T <dbl>, U <dbl>, V <dbl>, W <dbl>,
#> # X <dbl>, Y <dbl>, Z <dbl>
df %>%
group_by(Key) %>%
nest() %>%
mutate(panel = map_chr(data, ~ unlist(.) %>% as.logical %>% names(df)[-1][.] %>% paste(collapse = ",")))
#> # A tibble: 4 x 3
#> Key data panel
#> <chr> <list> <chr>
#> 1 s1 <tibble [1 x 26]> B,C,D,E,J,K,L,M,N,O,P,Q,R,W,Y,Z
#> 2 s2 <tibble [1 x 26]> B,D,F,G,H,I,N,R,S,T,V,W,X,Z
#> 3 s3 <tibble [1 x 26]> B,C,D,E,J,K,M,N,O,Q,R,S,T,V,X,Y
#> 4 s4 <tibble [1 x 26]> D,H,I,K,L,O,P,T,U,V,W,Z
我认为@JasonAizkalns 走在正确的轨道上。从他的例子开始:
library(dplyr)
library(sparklyr)
sc <- spark_connect(master = "local")
mat <- matrix(c(paste0("s", 1:4), as.numeric(sample(0:1, 4 * 26, TRUE))), ncol = 27)
colnames(mat) <- c("Key", LETTERS[1:26])
df <- data.frame(mat, stringsAsFactors = FALSE) %>%
mutate_at(vars(-"Key"), as.numeric) %>%
as_data_frame()
df
dfs <- copy_to(sc, df, overwrite = TRUE)
我们可以使用一点 rlang
魔法到达那里。
dfs <- dfs %>% mutate(panel = "")
for (letter in LETTERS[1:26]) {
dfs <- dfs %>% mutate(panel = concat_ws(",", panel, ifelse(!!sym(letter) == 1.0, yes = letter, no = NA)))
}
dfs %>%
mutate(panel = regexp_replace(panel, "^,", "")) %>% # remove leading comma
select(Key, A:D, panel)
提供我认为您想要的
# Source: spark<?> [?? x 6]
Key A B C D panel
* <chr> <dbl> <dbl> <dbl> <dbl> <chr>
1 s1 0 0 1 1 C,D,E,G,O,P,Q,U,Z
2 s2 1 0 0 1 A,D,G,K,L,M,N,Q,S,U,W
3 s3 0 1 0 0 B,E,L,M,O,Q,R,S,T,Y
4 s4 1 1 0 1 A,B,D,E,G,I,J,M,N,R,S,T,U,V,Y,Z
这里的关键是 concat_ws
Spark SQL(不是 R)函数。参见 https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/functions.html#concat_ws-java.lang.String-org.apache.spark.sql.Column...-