如何使用 Sparklyr 汇总分类变量级别
How to use Sparklyr to summarize Categorical Variable Level
对于数据集中的每个分类变量,我想获取每个级别的计数和汇总统计信息。我可以使用 dlookr R package 使用他们的 diagnose_category()
函数来做到这一点。由于在工作中我没有那个包,所以我使用 dplyr 重新创建了函数。
在 sparklye 中,我可以一次获取单个变量的计数。需要帮助扩展它的所有分类变量。
需要帮助:
通过SparklyR实现函数
Table 1:需要的最终输出:
# A tibble: 20 x 6
variables levels N freq ratio rank
<chr> <ord> <int> <int> <dbl> <int>
1 cut Ideal 53940 21551 40.0 1
2 cut Premium 53940 13791 25.6 2
3 cut Very Good 53940 12082 22.4 3
4 cut Good 53940 4906 9.10 4
5 cut Fair 53940 1610 2.98 5
6 color G 53940 11292 20.9 1
7 color E 53940 9797 18.2 2
8 color F 53940 9542 17.7 3
9 color H 53940 8304 15.4 4
10 color D 53940 6775 12.6 5
11 color I 53940 5422 10.1 6
12 color J 53940 2808 5.21 7
13 clarity SI1 53940 13065 24.2 1
14 clarity VS2 53940 12258 22.7 2
15 clarity SI2 53940 9194 17.0 3
16 clarity VS1 53940 8171 15.1 4
17 clarity VVS2 53940 5066 9.39 5
18 clarity VVS1 53940 3655 6.78 6
19 clarity IF 53940 1790 3.32 7
20 clarity I1 53940 741 1.37 8
R代码:
# Categorical Variable Profile
# Table based on dlookr package, diagnose_category() function
# variables : variable names
# types: the data type of the variable
# levels: level names
# N : Number of observation
# freq : Number of observation at the level
# ratio : Percentage of observation at the level
# rank : Rank of occupancy ratio of levels
library(ggplot2)
library(dplyr)
library(tidyr)
library(purrr)
library(tibble)
library(stringr)
# Helper Function
cat_level_summary <- function(df,x) {
count(df,x, sort = TRUE) %>%
transmute(levels = x, N = sum(n), freq = n,
ratio = n / sum(n) * 100, rank = row_number())
}
# Loading
diamonds_tbl <- diamonds
# Main Code
CategoricalVariableProfile <- diamonds_tbl %>%
select_if(~!is.numeric(.)) %>%
map(~cat_level_summary(data.frame(x=.x), x)) %>%
do.call(rbind.data.frame, .) %>%
rownames_to_column(., "variables")%>%
mutate(variables = str_match(variables, ".*(?=\.)")[, 1] )
火花代码:
#Spark data Table
diamonds_tbl <- copy_to(sc, diamonds, "diamonds", overwrite = TRUE)
CategoricalVariableProfile <- diamonds_tbl %>%
group_by(cut) %>%
summarize(count = n()) %>%
sdf_register("CategoricalVariableProfile")
使用 将数据展平:
long <- diamonds_tbl %>%
select(cut, color, clarity) %>%
sdf_gather("variable", "level", "cut", "color", "clarity")
按 variable
和 level
聚合:
counts <- long %>% group_by(variable, level) %>% summarise(freq = n())
最后应用所需的 window 函数:
result <- counts %>%
arrange(-freq) %>%
mutate(
rank = rank(),
total = sum(freq, na.rm = TRUE),
ratio = freq / total * 100)
哪个会给你
result
# Source: spark<?> [?? x 6]
# Groups: variable
# Ordered by: -freq
variable level freq rank total ratio
<chr> <chr> <dbl> <int> <dbl> <dbl>
1 cut Ideal 21551 1 53940 40.0
2 cut Premium 13791 2 53940 25.6
3 cut Very Good 12082 3 53940 22.4
4 cut Good 4906 4 53940 9.10
5 cut Fair 1610 5 53940 2.98
6 clarity SI1 13065 1 53940 24.2
7 clarity VS2 12258 2 53940 22.7
8 clarity SI2 9194 3 53940 17.0
9 clarity VS1 8171 4 53940 15.1
10 clarity VVS2 5066 5 53940 9.39
# … with more rows
与以下
optimizedPlan(result)
<jobj[165]>
org.apache.spark.sql.catalyst.plans.logical.Project
Project [variable#524, level#525, freq#1478L, rank#1479, total#1480L, ((cast(freq#1478L as double) / cast(total#1480L as double)) * 100.0) AS ratio#1481]
+- Window [rank(_w1#1493L) windowspecdefinition(variable#524, _w1#1493L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rank#1479], [variable#524], [_w1#1493L ASC NULLS FIRST]
+- Window [sum(freq#1478L) windowspecdefinition(variable#524, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS total#1480L], [variable#524]
+- Project [variable#524, level#525, freq#1478L, -freq#1478L AS _w1#1493L]
+- Sort [-freq#1478L ASC NULLS FIRST], true
+- Aggregate [variable#524, level#525], [variable#524, level#525, count(1) AS freq#1478L]
+- Generate explode(map(cut, cut#19, color, color#20, clarity, clarity#21)), [0, 1, 2], false, [variable#524, level#525]
+- Project [cut#19, color#20, clarity#21]
+- InMemoryRelation [carat#18, cut#19, color#20, clarity#21, depth#22, table#23, price#24, x#25, y#26, z#27], StorageLevel(disk, memory, deserialized, 1 replicas)
+- Scan ExistingRDD[carat#18,cut#19,color#20,clarity#21,depth#22,table#23,price#24,x#25,y#26,z#27]
和查询(不包括sdf_gather
组件):
dbplyr::remote_query(result)
<SQL> SELECT `variable`, `level`, `freq`, `rank`, `total`, `freq` / `total` * 100.0 AS `ratio`
FROM (SELECT `variable`, `level`, `freq`, rank() OVER (PARTITION BY `variable` ORDER BY -`freq`) AS `rank`, sum(`freq`) OVER (PARTITION BY `variable`) AS `total`
FROM (SELECT *
FROM (SELECT `variable`, `level`, count(*) AS `freq`
FROM `sparklyr_tmp_ded2576b9f1`
GROUP BY `variable`, `level`) `dsbksdfhtf`
ORDER BY -`freq`) `obyrzsxeus`) `ekejqyjrfz`
对于数据集中的每个分类变量,我想获取每个级别的计数和汇总统计信息。我可以使用 dlookr R package 使用他们的 diagnose_category()
函数来做到这一点。由于在工作中我没有那个包,所以我使用 dplyr 重新创建了函数。
在 sparklye 中,我可以一次获取单个变量的计数。需要帮助扩展它的所有分类变量。
需要帮助:
通过SparklyR实现函数
Table 1:需要的最终输出:
# A tibble: 20 x 6
variables levels N freq ratio rank
<chr> <ord> <int> <int> <dbl> <int>
1 cut Ideal 53940 21551 40.0 1
2 cut Premium 53940 13791 25.6 2
3 cut Very Good 53940 12082 22.4 3
4 cut Good 53940 4906 9.10 4
5 cut Fair 53940 1610 2.98 5
6 color G 53940 11292 20.9 1
7 color E 53940 9797 18.2 2
8 color F 53940 9542 17.7 3
9 color H 53940 8304 15.4 4
10 color D 53940 6775 12.6 5
11 color I 53940 5422 10.1 6
12 color J 53940 2808 5.21 7
13 clarity SI1 53940 13065 24.2 1
14 clarity VS2 53940 12258 22.7 2
15 clarity SI2 53940 9194 17.0 3
16 clarity VS1 53940 8171 15.1 4
17 clarity VVS2 53940 5066 9.39 5
18 clarity VVS1 53940 3655 6.78 6
19 clarity IF 53940 1790 3.32 7
20 clarity I1 53940 741 1.37 8
R代码:
# Categorical Variable Profile
# Table based on dlookr package, diagnose_category() function
# variables : variable names
# types: the data type of the variable
# levels: level names
# N : Number of observation
# freq : Number of observation at the level
# ratio : Percentage of observation at the level
# rank : Rank of occupancy ratio of levels
library(ggplot2)
library(dplyr)
library(tidyr)
library(purrr)
library(tibble)
library(stringr)
# Helper Function
cat_level_summary <- function(df,x) {
count(df,x, sort = TRUE) %>%
transmute(levels = x, N = sum(n), freq = n,
ratio = n / sum(n) * 100, rank = row_number())
}
# Loading
diamonds_tbl <- diamonds
# Main Code
CategoricalVariableProfile <- diamonds_tbl %>%
select_if(~!is.numeric(.)) %>%
map(~cat_level_summary(data.frame(x=.x), x)) %>%
do.call(rbind.data.frame, .) %>%
rownames_to_column(., "variables")%>%
mutate(variables = str_match(variables, ".*(?=\.)")[, 1] )
火花代码:
#Spark data Table
diamonds_tbl <- copy_to(sc, diamonds, "diamonds", overwrite = TRUE)
CategoricalVariableProfile <- diamonds_tbl %>%
group_by(cut) %>%
summarize(count = n()) %>%
sdf_register("CategoricalVariableProfile")
使用
long <- diamonds_tbl %>%
select(cut, color, clarity) %>%
sdf_gather("variable", "level", "cut", "color", "clarity")
按 variable
和 level
聚合:
counts <- long %>% group_by(variable, level) %>% summarise(freq = n())
最后应用所需的 window 函数:
result <- counts %>%
arrange(-freq) %>%
mutate(
rank = rank(),
total = sum(freq, na.rm = TRUE),
ratio = freq / total * 100)
哪个会给你
result
# Source: spark<?> [?? x 6]
# Groups: variable
# Ordered by: -freq
variable level freq rank total ratio
<chr> <chr> <dbl> <int> <dbl> <dbl>
1 cut Ideal 21551 1 53940 40.0
2 cut Premium 13791 2 53940 25.6
3 cut Very Good 12082 3 53940 22.4
4 cut Good 4906 4 53940 9.10
5 cut Fair 1610 5 53940 2.98
6 clarity SI1 13065 1 53940 24.2
7 clarity VS2 12258 2 53940 22.7
8 clarity SI2 9194 3 53940 17.0
9 clarity VS1 8171 4 53940 15.1
10 clarity VVS2 5066 5 53940 9.39
# … with more rows
与以下
optimizedPlan(result)
<jobj[165]>
org.apache.spark.sql.catalyst.plans.logical.Project
Project [variable#524, level#525, freq#1478L, rank#1479, total#1480L, ((cast(freq#1478L as double) / cast(total#1480L as double)) * 100.0) AS ratio#1481]
+- Window [rank(_w1#1493L) windowspecdefinition(variable#524, _w1#1493L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rank#1479], [variable#524], [_w1#1493L ASC NULLS FIRST]
+- Window [sum(freq#1478L) windowspecdefinition(variable#524, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS total#1480L], [variable#524]
+- Project [variable#524, level#525, freq#1478L, -freq#1478L AS _w1#1493L]
+- Sort [-freq#1478L ASC NULLS FIRST], true
+- Aggregate [variable#524, level#525], [variable#524, level#525, count(1) AS freq#1478L]
+- Generate explode(map(cut, cut#19, color, color#20, clarity, clarity#21)), [0, 1, 2], false, [variable#524, level#525]
+- Project [cut#19, color#20, clarity#21]
+- InMemoryRelation [carat#18, cut#19, color#20, clarity#21, depth#22, table#23, price#24, x#25, y#26, z#27], StorageLevel(disk, memory, deserialized, 1 replicas)
+- Scan ExistingRDD[carat#18,cut#19,color#20,clarity#21,depth#22,table#23,price#24,x#25,y#26,z#27]
和查询(不包括sdf_gather
组件):
dbplyr::remote_query(result)
<SQL> SELECT `variable`, `level`, `freq`, `rank`, `total`, `freq` / `total` * 100.0 AS `ratio`
FROM (SELECT `variable`, `level`, `freq`, rank() OVER (PARTITION BY `variable` ORDER BY -`freq`) AS `rank`, sum(`freq`) OVER (PARTITION BY `variable`) AS `total`
FROM (SELECT *
FROM (SELECT `variable`, `level`, count(*) AS `freq`
FROM `sparklyr_tmp_ded2576b9f1`
GROUP BY `variable`, `level`) `dsbksdfhtf`
ORDER BY -`freq`) `obyrzsxeus`) `ekejqyjrfz`