如何在 filter() 中使用 sparklyr/dplyr n_distinct() 在 Azure databricks 的 spark dataframe 中使用条件过滤器数据

How to use sparklyr/dplyr n_distinct() in filter() to use conditional filter data in spark dataframe in Azure databricks

我在 Azure databricks 中有一个大型数据集作为 Spark 数据框,并使用 R 代码分析数据。我正在将在本地桌面 RStudio 中运行的 R 代码转换为 Databricks R 代码。 我正在尝试根据 n_distinct(column)>2 过滤大型 spark 数据帧,以进行进一步分析。

我尝试将工作的本地桌面 Rstidio 代码用于 Azure databricks 中的 RStudio。

需要帮助将 "filter( n_distinct(carb)>2)" 转换为 spark 代码

用于数据块中的 Rstudio 或 R 笔记本。

## working desktop R code
library(dplyr)
set.seed(10)
df <- data.frame(mtcars)
## filter the dataset to have only those "cyl" which have number of "carb" more than 2
df.dt1<- df  %>%  group_by(cyl)  %>% filter( n_distinct(carb)>2)

df.dt1
## Databricks  - RStudio code
set.seed(10)

## use the mtcars dataset
df <- data.frame(mtcars)

## copying to Spark

df.spark <- copy_to(sc, df, "df_spark", overwrite = TRUE)

## filter the dataset to have only those "cyl" which have number of "carb" more than 2
df.dt1<- df.spark %>% group_by(cyl)  %>% filter(dplyr::n_distinct(carb)>2) %>% collect()

错误:Window函数distinct()不受此数据库支持

预期输出如下

cyl disp    hp  drat    wt  qsec    vs  am  gear    carb
<dbl>   <dbl>   <dbl>   <dbl>   <dbl>   <dbl>   <dbl>   <dbl>   <dbl>   <dbl>
6   160 110 3.9 2.62    16.46   0   1   4   4
6   160 110 3.9 2.875   17.02   0   1   4   4
6   258 110 3.08    3.215   19.44   1   0   3   1
8   360 175 3.15    3.44    17.02   0   0   3   2
6   225 105 2.76    3.46    20.22   1   0   3   1
8   360 245 3.21    3.57    15.84   0   0   3   4
6   167.6   123 3.92    3.44    18.3    1   0   4   4
6   167.6   123 3.92    3.44    18.9    1   0   4   4
8   275.8   180 3.07    4.07    17.4    0   0   3   3
8   275.8   180 3.07    3.73    17.6    0   0   3   3

结果数据集将只有来自 "cyl" 6 和 8 的记录,它们分别具有独特的计数 "carb" 3 和 4,而 cyl 4 被省略,因为它具有独特的碳水化合物计数2

## actual working code from my dataset in RStudio in Databricks

multi_contract <- Cust_details %>%   

group_by(CustomerID)  %>% 

## filter records for customers having more than one contract
filter(n_distinct(ContractType)>1)


此代码的问题是处理 100 万条记录需要大约 1 小时,而生成的数据集只有 41k 条记录。 所以在 sparklyr 或 sparkR 中必须有更好的方法来做到这一点。

这是一种计算不使用 distinct 的组 B 中某个值 A 的不同观察值的方法:

df %>%
  distinct(A, B) %>%
  group_by(B) %>%
  summarise(distinct_A = n())

您可以在分组列 B 上将结果与原始 df 进行内部联接以获得所需的结果。就你的例子而言,

sc <- spark_connect(master = "local")

mtcars_spark <- sdf_copy_to(sc, mtcars, "mtcars_spark")

keep_cyl <- mtcars_spark %>% 
  distinct(cyl, carb) %>%
  group_by(cyl) %>%
  summarise(distinct_carb_count = n()) %>%
  filter(distinct_carb_count > 2)

inner_join(keep_cyl, mtcars_spark)