在 Sparklyr 中按组计算分位数
calculate quantile by group in Sparklyr
我在 Spark 中有一个数据框,想在按特定列分组后计算 0.1 分位数。
例如:
> library(sparklyr)
> library(tidyverse)
> con = spark_connect(....)
> diamonds_sdl = copy_to(con, diamonds)
> diamonds
# Source: table<diamonds> [?? x 10]
# Database: spark_connection
carat cut color clarity depth table price x y z
<dbl> <chr> <chr> <chr> <dbl> <dbl> <int> <dbl> <dbl> <dbl>
1 0.230 Ideal E SI2 61.5 55.0 326 3.95 3.98 2.43
2 0.210 Premium E SI1 59.8 61.0 326 3.89 3.84 2.31
3 0.230 Good E VS1 56.9 65.0 327 4.05 4.07 2.31
4 0.290 Premium I VS2 62.4 58.0 334 4.20 4.23 2.63
5 0.310 Good J SI2 63.3 58.0 335 4.34 4.35 2.75
6 0.240 Very Good J VVS2 62.8 57.0 336 3.94 3.96 2.48
7 0.240 Very Good I VVS1 62.3 57.0 336 3.95 3.98 2.47
8 0.260 Very Good H SI1 61.9 55.0 337 4.07 4.11 2.53
9 0.220 Fair E VS2 65.1 61.0 337 3.87 3.78 2.49
10 0.230 Very Good H VS1 59.4 61.0 338 4.00 4.05 2.39
我的第一个想法是使用group_by和总结,但显然sparklyr中没有实现分位数功能:
> diamonds_sdl %>% group_by(color) %>% summarise(q1=quantile(carat, .1))
Error: org.apache.spark.sql.AnalysisException: Undefined function: 'QUANTILE'. This function is neither a registered temporary function nor a permanent function registered in the database 'tsci
'.; line 1 pos 16
没有运气使用 sdl_quantile,如这里所建议的:https://github.com/rstudio/sparklyr/issues/204。请注意,我刚刚从 github.
升级了 sparklyr 和 运行ning 版本 0.7.0-9004
> diamonds_sdl %>% group_by(color) %>% summarise(q1=sdf_quantile(carat, .1))
Error: org.apache.spark.sql.AnalysisException: Undefined function: 'SDF_QUANTILE'. This function is neither a registered temporary function nor a permanent function registered in the database '
tsci'.; line 1 pos 16
如果我想计算整列的分位数,sdf_quantile 可以工作 - 但这不是我感兴趣的:
> sdf_quantile(diamonds_sdl, "carat", 0.1)
10%
0.31
我尝试过其他方法。
首先是使用spark_apply。但是,它似乎在我的安装中无法正常工作。另一个 运行 返回错误,指出 "Rscript" 未安装在节点中。但是,我无法真正解决这个问题,因为我没有管理员权限。
> spark_apply(diamonds_sdl, function(x) quantile(x, 0.1))
Error: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 35.0 failed 4 times, most recent failure: Lost task 0.3 in stage 35.0 (TID 735, myserver
.com, executor 393): java.lang.Exception: sparklyr worker rscript failure with status 255, check worker logs for details.
at sparklyr.Rscript.init(rscript.scala:98)
at sparklyr.WorkerRDD$$anon.run(rdd.scala:95)
第二种方法是使用 collect_list,如下所示:
# This one almost works
> diamonds_sdl %>% group_by(color) %>% summarise(q1=paste(collect_list(as.character(carat))))
# Source: lazy query [?? x 2]
# Database: spark_connection
color
<chr>
1 F
2 E
3 D
4 J
5 G
6 I
7 H
q1
<chr>
1 0.22 0.23 0.23 0.23 0.23 0.29 0.24 0.26 0.7 0.96 0.81 0.8 0.73 0.73 0.8 0.8 …
2 0.23 0.21 0.23 0.22 0.2 0.32 0.23 0.23 0.23 0.23 0.25 0.22 0.24 0.26 0.26 0.…
3 0.23 0.23 0.26 0.26 0.26 0.22 0.3 0.3 0.3 0.24 0.26 0.26 0.26 0.75 0.71 0.61…
4 0.31 0.24 0.3 0.23 0.31 0.3 0.3 0.3 0.31 0.31 0.3 0.33 0.3 1.17 1.05 1.05 1.…
5 0.23 0.23 0.28 0.31 0.31 0.24 0.7 0.78 0.74 0.75 0.75 0.8 0.74 0.71 0.64 0.7…
6 0.29 0.24 0.3 0.3 0.24 0.33 0.33 0.32 0.3 0.3 0.3 0.3 0.35 0.42 0.32 0.38 0.…
# but here is the error
> diamonds_sdl %>% group_by(color) %>% summarise(q1=quantile(as.numeric(paste(collect_list(as.character(carat))))))
Error: org.apache.spark.sql.AnalysisException: Undefined function: 'QUANTILE'. This function is neither a registered temporary function nor a permanent function registered in the database 'tsci
'.; line 1 pos 16
对于分组数据,最好的选择是 percentile_approx
:
diamonds_sdl %>% group_by(color) %>% summarise(q1 = percentile_approx(carat, .1))
# Source: lazy query [?? x 2]
# Database: spark_connection
color q1
<chr> <dbl>
1 H 0.310
2 F 0.310
3 G 0.310
4 I 0.320
5 J 0.400
6 D 0.310
7 E 0.300
但是,这需要启用 Hive 支持的 Spark,并且效率低于内置 approxQuantile
。
我在 Spark 中有一个数据框,想在按特定列分组后计算 0.1 分位数。
例如:
> library(sparklyr)
> library(tidyverse)
> con = spark_connect(....)
> diamonds_sdl = copy_to(con, diamonds)
> diamonds
# Source: table<diamonds> [?? x 10]
# Database: spark_connection
carat cut color clarity depth table price x y z
<dbl> <chr> <chr> <chr> <dbl> <dbl> <int> <dbl> <dbl> <dbl>
1 0.230 Ideal E SI2 61.5 55.0 326 3.95 3.98 2.43
2 0.210 Premium E SI1 59.8 61.0 326 3.89 3.84 2.31
3 0.230 Good E VS1 56.9 65.0 327 4.05 4.07 2.31
4 0.290 Premium I VS2 62.4 58.0 334 4.20 4.23 2.63
5 0.310 Good J SI2 63.3 58.0 335 4.34 4.35 2.75
6 0.240 Very Good J VVS2 62.8 57.0 336 3.94 3.96 2.48
7 0.240 Very Good I VVS1 62.3 57.0 336 3.95 3.98 2.47
8 0.260 Very Good H SI1 61.9 55.0 337 4.07 4.11 2.53
9 0.220 Fair E VS2 65.1 61.0 337 3.87 3.78 2.49
10 0.230 Very Good H VS1 59.4 61.0 338 4.00 4.05 2.39
我的第一个想法是使用group_by和总结,但显然sparklyr中没有实现分位数功能:
> diamonds_sdl %>% group_by(color) %>% summarise(q1=quantile(carat, .1))
Error: org.apache.spark.sql.AnalysisException: Undefined function: 'QUANTILE'. This function is neither a registered temporary function nor a permanent function registered in the database 'tsci
'.; line 1 pos 16
没有运气使用 sdl_quantile,如这里所建议的:https://github.com/rstudio/sparklyr/issues/204。请注意,我刚刚从 github.
升级了 sparklyr 和 运行ning 版本 0.7.0-9004> diamonds_sdl %>% group_by(color) %>% summarise(q1=sdf_quantile(carat, .1))
Error: org.apache.spark.sql.AnalysisException: Undefined function: 'SDF_QUANTILE'. This function is neither a registered temporary function nor a permanent function registered in the database '
tsci'.; line 1 pos 16
如果我想计算整列的分位数,sdf_quantile 可以工作 - 但这不是我感兴趣的:
> sdf_quantile(diamonds_sdl, "carat", 0.1)
10%
0.31
我尝试过其他方法。
首先是使用spark_apply。但是,它似乎在我的安装中无法正常工作。另一个 运行 返回错误,指出 "Rscript" 未安装在节点中。但是,我无法真正解决这个问题,因为我没有管理员权限。
> spark_apply(diamonds_sdl, function(x) quantile(x, 0.1))
Error: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 35.0 failed 4 times, most recent failure: Lost task 0.3 in stage 35.0 (TID 735, myserver
.com, executor 393): java.lang.Exception: sparklyr worker rscript failure with status 255, check worker logs for details.
at sparklyr.Rscript.init(rscript.scala:98)
at sparklyr.WorkerRDD$$anon.run(rdd.scala:95)
第二种方法是使用 collect_list,如下所示:
# This one almost works
> diamonds_sdl %>% group_by(color) %>% summarise(q1=paste(collect_list(as.character(carat))))
# Source: lazy query [?? x 2]
# Database: spark_connection
color
<chr>
1 F
2 E
3 D
4 J
5 G
6 I
7 H
q1
<chr>
1 0.22 0.23 0.23 0.23 0.23 0.29 0.24 0.26 0.7 0.96 0.81 0.8 0.73 0.73 0.8 0.8 …
2 0.23 0.21 0.23 0.22 0.2 0.32 0.23 0.23 0.23 0.23 0.25 0.22 0.24 0.26 0.26 0.…
3 0.23 0.23 0.26 0.26 0.26 0.22 0.3 0.3 0.3 0.24 0.26 0.26 0.26 0.75 0.71 0.61…
4 0.31 0.24 0.3 0.23 0.31 0.3 0.3 0.3 0.31 0.31 0.3 0.33 0.3 1.17 1.05 1.05 1.…
5 0.23 0.23 0.28 0.31 0.31 0.24 0.7 0.78 0.74 0.75 0.75 0.8 0.74 0.71 0.64 0.7…
6 0.29 0.24 0.3 0.3 0.24 0.33 0.33 0.32 0.3 0.3 0.3 0.3 0.35 0.42 0.32 0.38 0.…
# but here is the error
> diamonds_sdl %>% group_by(color) %>% summarise(q1=quantile(as.numeric(paste(collect_list(as.character(carat))))))
Error: org.apache.spark.sql.AnalysisException: Undefined function: 'QUANTILE'. This function is neither a registered temporary function nor a permanent function registered in the database 'tsci
'.; line 1 pos 16
对于分组数据,最好的选择是 percentile_approx
:
diamonds_sdl %>% group_by(color) %>% summarise(q1 = percentile_approx(carat, .1))
# Source: lazy query [?? x 2]
# Database: spark_connection
color q1
<chr> <dbl>
1 H 0.310
2 F 0.310
3 G 0.310
4 I 0.320
5 J 0.400
6 D 0.310
7 E 0.300
但是,这需要启用 Hive 支持的 Spark,并且效率低于内置 approxQuantile
。