如何在 filter() 中使用 sparklyr/dplyr n_distinct() 在 Azure databricks 的 spark dataframe 中使用条件过滤器数据
How to use sparklyr/dplyr n_distinct() in filter() to use conditional filter data in spark dataframe in Azure databricks
我在 Azure databricks 中有一个大型数据集作为 Spark 数据框,并使用 R 代码分析数据。我正在将在本地桌面 RStudio 中运行的 R 代码转换为 Databricks R 代码。
我正在尝试根据 n_distinct(column)>2 过滤大型 spark 数据帧,以进行进一步分析。
我尝试将工作的本地桌面 Rstidio 代码用于 Azure databricks 中的 RStudio。
需要帮助将 "filter( n_distinct(carb)>2)" 转换为 spark 代码
用于数据块中的 Rstudio 或 R 笔记本。
## working desktop R code
library(dplyr)
set.seed(10)
df <- data.frame(mtcars)
## filter the dataset to have only those "cyl" which have number of "carb" more than 2
df.dt1<- df %>% group_by(cyl) %>% filter( n_distinct(carb)>2)
df.dt1
## Databricks - RStudio code
set.seed(10)
## use the mtcars dataset
df <- data.frame(mtcars)
## copying to Spark
df.spark <- copy_to(sc, df, "df_spark", overwrite = TRUE)
## filter the dataset to have only those "cyl" which have number of "carb" more than 2
df.dt1<- df.spark %>% group_by(cyl) %>% filter(dplyr::n_distinct(carb)>2) %>% collect()
错误:Window函数distinct()
不受此数据库支持
预期输出如下
cyl disp hp drat wt qsec vs am gear carb
<dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl>
6 160 110 3.9 2.62 16.46 0 1 4 4
6 160 110 3.9 2.875 17.02 0 1 4 4
6 258 110 3.08 3.215 19.44 1 0 3 1
8 360 175 3.15 3.44 17.02 0 0 3 2
6 225 105 2.76 3.46 20.22 1 0 3 1
8 360 245 3.21 3.57 15.84 0 0 3 4
6 167.6 123 3.92 3.44 18.3 1 0 4 4
6 167.6 123 3.92 3.44 18.9 1 0 4 4
8 275.8 180 3.07 4.07 17.4 0 0 3 3
8 275.8 180 3.07 3.73 17.6 0 0 3 3
结果数据集将只有来自 "cyl" 6 和 8 的记录,它们分别具有独特的计数 "carb" 3 和 4,而 cyl 4 被省略,因为它具有独特的碳水化合物计数2
## actual working code from my dataset in RStudio in Databricks
multi_contract <- Cust_details %>%
group_by(CustomerID) %>%
## filter records for customers having more than one contract
filter(n_distinct(ContractType)>1)
此代码的问题是处理 100 万条记录需要大约 1 小时,而生成的数据集只有 41k 条记录。
所以在 sparklyr 或 sparkR 中必须有更好的方法来做到这一点。
这是一种计算不使用 distinct
的组 B 中某个值 A 的不同观察值的方法:
df %>%
distinct(A, B) %>%
group_by(B) %>%
summarise(distinct_A = n())
您可以在分组列 B
上将结果与原始 df 进行内部联接以获得所需的结果。就你的例子而言,
sc <- spark_connect(master = "local")
mtcars_spark <- sdf_copy_to(sc, mtcars, "mtcars_spark")
keep_cyl <- mtcars_spark %>%
distinct(cyl, carb) %>%
group_by(cyl) %>%
summarise(distinct_carb_count = n()) %>%
filter(distinct_carb_count > 2)
inner_join(keep_cyl, mtcars_spark)
我在 Azure databricks 中有一个大型数据集作为 Spark 数据框,并使用 R 代码分析数据。我正在将在本地桌面 RStudio 中运行的 R 代码转换为 Databricks R 代码。 我正在尝试根据 n_distinct(column)>2 过滤大型 spark 数据帧,以进行进一步分析。
我尝试将工作的本地桌面 Rstidio 代码用于 Azure databricks 中的 RStudio。
需要帮助将 "filter( n_distinct(carb)>2)" 转换为 spark 代码
用于数据块中的 Rstudio 或 R 笔记本。
## working desktop R code
library(dplyr)
set.seed(10)
df <- data.frame(mtcars)
## filter the dataset to have only those "cyl" which have number of "carb" more than 2
df.dt1<- df %>% group_by(cyl) %>% filter( n_distinct(carb)>2)
df.dt1
## Databricks - RStudio code
set.seed(10)
## use the mtcars dataset
df <- data.frame(mtcars)
## copying to Spark
df.spark <- copy_to(sc, df, "df_spark", overwrite = TRUE)
## filter the dataset to have only those "cyl" which have number of "carb" more than 2
df.dt1<- df.spark %>% group_by(cyl) %>% filter(dplyr::n_distinct(carb)>2) %>% collect()
错误:Window函数distinct()
不受此数据库支持
预期输出如下
cyl disp hp drat wt qsec vs am gear carb
<dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl>
6 160 110 3.9 2.62 16.46 0 1 4 4
6 160 110 3.9 2.875 17.02 0 1 4 4
6 258 110 3.08 3.215 19.44 1 0 3 1
8 360 175 3.15 3.44 17.02 0 0 3 2
6 225 105 2.76 3.46 20.22 1 0 3 1
8 360 245 3.21 3.57 15.84 0 0 3 4
6 167.6 123 3.92 3.44 18.3 1 0 4 4
6 167.6 123 3.92 3.44 18.9 1 0 4 4
8 275.8 180 3.07 4.07 17.4 0 0 3 3
8 275.8 180 3.07 3.73 17.6 0 0 3 3
结果数据集将只有来自 "cyl" 6 和 8 的记录,它们分别具有独特的计数 "carb" 3 和 4,而 cyl 4 被省略,因为它具有独特的碳水化合物计数2
## actual working code from my dataset in RStudio in Databricks
multi_contract <- Cust_details %>%
group_by(CustomerID) %>%
## filter records for customers having more than one contract
filter(n_distinct(ContractType)>1)
此代码的问题是处理 100 万条记录需要大约 1 小时,而生成的数据集只有 41k 条记录。 所以在 sparklyr 或 sparkR 中必须有更好的方法来做到这一点。
这是一种计算不使用 distinct
的组 B 中某个值 A 的不同观察值的方法:
df %>%
distinct(A, B) %>%
group_by(B) %>%
summarise(distinct_A = n())
您可以在分组列 B
上将结果与原始 df 进行内部联接以获得所需的结果。就你的例子而言,
sc <- spark_connect(master = "local")
mtcars_spark <- sdf_copy_to(sc, mtcars, "mtcars_spark")
keep_cyl <- mtcars_spark %>%
distinct(cyl, carb) %>%
group_by(cyl) %>%
summarise(distinct_carb_count = n()) %>%
filter(distinct_carb_count > 2)
inner_join(keep_cyl, mtcars_spark)