与 sparklyr 一起使用时 sample_n 真的是随机样本吗?
Is sample_n really a random sample when used with sparklyr?
我在 spark 数据框中有 5 亿行。我对使用 dplyr
中的 sample_n
很感兴趣,因为它允许我明确指定我想要的样本大小。如果我要使用 sparklyr::sdf_sample()
,我首先必须计算 sdf_nrow()
,然后创建指定的数据分数 sample_size / nrow
,然后将此分数传递给 sdf_sample
。这没什么大不了的,但是 sdf_nrow()
可能需要一段时间才能完成。
所以,直接使用dplyr::sample_n()
会比较理想。然而,经过一些测试,它看起来不像 sample_n()
是随机的。事实上,结果与 head()
相同!如果函数不是随机采样行,而是只返回第一个 n
行,这将是一个主要问题。
还有谁能证实这一点吗? sdf_sample()
是我最好的选择吗?
# install.packages("gapminder")
library(gapminder)
library(sparklyr)
library(purrr)
sc <- spark_connect(master = "yarn-client")
spark_data <- sdf_import(gapminder, sc, "gapminder")
> # Appears to be random
> spark_data %>% sdf_sample(fraction = 0.20, replace = FALSE) %>% summarise(sample_mean = mean(lifeExp))
# Source: lazy query [?? x 1]
# Database: spark_connection
sample_mean
<dbl>
1 58.83397
> spark_data %>% sdf_sample(fraction = 0.20, replace = FALSE) %>% summarise(sample_mean = mean(lifeExp))
# Source: lazy query [?? x 1]
# Database: spark_connection
sample_mean
<dbl>
1 60.31693
> spark_data %>% sdf_sample(fraction = 0.20, replace = FALSE) %>% summarise(sample_mean = mean(lifeExp))
# Source: lazy query [?? x 1]
# Database: spark_connection
sample_mean
<dbl>
1 59.38692
>
>
> # Appears to be random
> spark_data %>% sample_frac(0.20) %>% summarise(sample_mean = mean(lifeExp))
# Source: lazy query [?? x 1]
# Database: spark_connection
sample_mean
<dbl>
1 60.48903
> spark_data %>% sample_frac(0.20) %>% summarise(sample_mean = mean(lifeExp))
# Source: lazy query [?? x 1]
# Database: spark_connection
sample_mean
<dbl>
1 59.44187
> spark_data %>% sample_frac(0.20) %>% summarise(sample_mean = mean(lifeExp))
# Source: lazy query [?? x 1]
# Database: spark_connection
sample_mean
<dbl>
1 59.27986
>
>
> # Does not appear to be random
> spark_data %>% sample_n(300) %>% summarise(sample_mean = mean(lifeExp))
# Source: lazy query [?? x 1]
# Database: spark_connection
sample_mean
<dbl>
1 57.78434
> spark_data %>% sample_n(300) %>% summarise(sample_mean = mean(lifeExp))
# Source: lazy query [?? x 1]
# Database: spark_connection
sample_mean
<dbl>
1 57.78434
> spark_data %>% sample_n(300) %>% summarise(sample_mean = mean(lifeExp))
# Source: lazy query [?? x 1]
# Database: spark_connection
sample_mean
<dbl>
1 57.78434
>
>
>
> # === Test sample_n() ===
> sample_mean <- list()
>
> for(i in 1:20){
+
+ sample_mean[i] <- spark_data %>% sample_n(300) %>% summarise(sample_mean = mean(lifeExp)) %>% collect() %>% pull()
+
+ }
>
>
> sample_mean %>% flatten_dbl() %>% mean()
[1] 57.78434
> sample_mean %>% flatten_dbl() %>% sd()
[1] 0
>
>
> # === Test head() ===
> spark_data %>%
+ head(300) %>%
+ pull(lifeExp) %>%
+ mean()
[1] 57.78434
不是。如果您检查执行计划(optimizedPlan
定义的函数 ),您会发现它只是一个限制:
spark_data %>% sample_n(300) %>% optimizedPlan()
<jobj[168]>
org.apache.spark.sql.catalyst.plans.logical.GlobalLimit
GlobalLimit 300
+- LocalLimit 300
+- InMemoryRelation [country#151, continent#152, year#153, lifeExp#154, pop#155, gdpPercap#156], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas), `gapminder`
+- Scan ExistingRDD[country#151,continent#152,year#153,lifeExp#154,pop#155,gdpPercap#156]
show_query
进一步证实了这一点:
spark_data %>% sample_n(300) %>% show_query()
<SQL>
SELECT *
FROM (SELECT *
FROM `gapminder` TABLESAMPLE (300 rows) ) `hntcybtgns`
和可视化执行计划:
最后,如果你检查 Spark source,你会看到这个案例是用简单的 LIMIT
:
实现的
case ctx: SampleByRowsContext =>
Limit(expression(ctx.expression), query)
我相信这个语义是从 Hive where equivalent query takes n first rows from each input split 继承的。
实际上,获取精确大小的样本非常昂贵,除非绝对必要(与大 LIMITS
相同),否则您应该避免。
我在 spark 数据框中有 5 亿行。我对使用 dplyr
中的 sample_n
很感兴趣,因为它允许我明确指定我想要的样本大小。如果我要使用 sparklyr::sdf_sample()
,我首先必须计算 sdf_nrow()
,然后创建指定的数据分数 sample_size / nrow
,然后将此分数传递给 sdf_sample
。这没什么大不了的,但是 sdf_nrow()
可能需要一段时间才能完成。
所以,直接使用dplyr::sample_n()
会比较理想。然而,经过一些测试,它看起来不像 sample_n()
是随机的。事实上,结果与 head()
相同!如果函数不是随机采样行,而是只返回第一个 n
行,这将是一个主要问题。
还有谁能证实这一点吗? sdf_sample()
是我最好的选择吗?
# install.packages("gapminder")
library(gapminder)
library(sparklyr)
library(purrr)
sc <- spark_connect(master = "yarn-client")
spark_data <- sdf_import(gapminder, sc, "gapminder")
> # Appears to be random
> spark_data %>% sdf_sample(fraction = 0.20, replace = FALSE) %>% summarise(sample_mean = mean(lifeExp))
# Source: lazy query [?? x 1]
# Database: spark_connection
sample_mean
<dbl>
1 58.83397
> spark_data %>% sdf_sample(fraction = 0.20, replace = FALSE) %>% summarise(sample_mean = mean(lifeExp))
# Source: lazy query [?? x 1]
# Database: spark_connection
sample_mean
<dbl>
1 60.31693
> spark_data %>% sdf_sample(fraction = 0.20, replace = FALSE) %>% summarise(sample_mean = mean(lifeExp))
# Source: lazy query [?? x 1]
# Database: spark_connection
sample_mean
<dbl>
1 59.38692
>
>
> # Appears to be random
> spark_data %>% sample_frac(0.20) %>% summarise(sample_mean = mean(lifeExp))
# Source: lazy query [?? x 1]
# Database: spark_connection
sample_mean
<dbl>
1 60.48903
> spark_data %>% sample_frac(0.20) %>% summarise(sample_mean = mean(lifeExp))
# Source: lazy query [?? x 1]
# Database: spark_connection
sample_mean
<dbl>
1 59.44187
> spark_data %>% sample_frac(0.20) %>% summarise(sample_mean = mean(lifeExp))
# Source: lazy query [?? x 1]
# Database: spark_connection
sample_mean
<dbl>
1 59.27986
>
>
> # Does not appear to be random
> spark_data %>% sample_n(300) %>% summarise(sample_mean = mean(lifeExp))
# Source: lazy query [?? x 1]
# Database: spark_connection
sample_mean
<dbl>
1 57.78434
> spark_data %>% sample_n(300) %>% summarise(sample_mean = mean(lifeExp))
# Source: lazy query [?? x 1]
# Database: spark_connection
sample_mean
<dbl>
1 57.78434
> spark_data %>% sample_n(300) %>% summarise(sample_mean = mean(lifeExp))
# Source: lazy query [?? x 1]
# Database: spark_connection
sample_mean
<dbl>
1 57.78434
>
>
>
> # === Test sample_n() ===
> sample_mean <- list()
>
> for(i in 1:20){
+
+ sample_mean[i] <- spark_data %>% sample_n(300) %>% summarise(sample_mean = mean(lifeExp)) %>% collect() %>% pull()
+
+ }
>
>
> sample_mean %>% flatten_dbl() %>% mean()
[1] 57.78434
> sample_mean %>% flatten_dbl() %>% sd()
[1] 0
>
>
> # === Test head() ===
> spark_data %>%
+ head(300) %>%
+ pull(lifeExp) %>%
+ mean()
[1] 57.78434
不是。如果您检查执行计划(optimizedPlan
定义的函数
spark_data %>% sample_n(300) %>% optimizedPlan()
<jobj[168]>
org.apache.spark.sql.catalyst.plans.logical.GlobalLimit
GlobalLimit 300
+- LocalLimit 300
+- InMemoryRelation [country#151, continent#152, year#153, lifeExp#154, pop#155, gdpPercap#156], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas), `gapminder`
+- Scan ExistingRDD[country#151,continent#152,year#153,lifeExp#154,pop#155,gdpPercap#156]
show_query
进一步证实了这一点:
spark_data %>% sample_n(300) %>% show_query()
<SQL>
SELECT *
FROM (SELECT *
FROM `gapminder` TABLESAMPLE (300 rows) ) `hntcybtgns`
和可视化执行计划:
最后,如果你检查 Spark source,你会看到这个案例是用简单的 LIMIT
:
case ctx: SampleByRowsContext =>
Limit(expression(ctx.expression), query)
我相信这个语义是从 Hive where equivalent query takes n first rows from each input split 继承的。
实际上,获取精确大小的样本非常昂贵,除非绝对必要(与大 LIMITS
相同),否则您应该避免。