如何使用 %in% 运算符过滤 Spark Dataframe?

How to Filter Spark Dataframe with %in% operator?

我有这个要过滤的数据集:

它包含如下唯一值:

user_unique <- movie_rating %>% 
  select(userId) %>% distinct() %>% count() %>% collect() %>%
  unlist %>% as.vector
movie_unique <- movie_rating %>% 
  select(movieId) %>% distinct() %>% count() %>% collect() %>%
  unlist %>% as.vector

user_unique_vector <- movie_rating %>% 
  select(userId) %>% distinct() %>% collect() %>%
  unlist %>% as.vector
movie_unique_vector <- movie_rating %>% 
  select(movieId) %>% distinct() %>% collect() %>%
  unlist %>% as.vector

然后我想过滤整个 DF 例如前 50 个现有的 movieID

movie_rating %>%
  filter(movieId %in% c(movie_unique_vector[1:50])) 

但是它return意外错误:

Error: org.apache.spark.sql.catalyst.parser.ParseException: 
no viable alternative at input '(`movieId` IN CASE'(line 3, pos 20)

== SQL ==
    SELECT *
FROM `movie_rating`
WHERE (`movieId` IN CASE WHEN ((1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50)) 
THEN ((110, 46850, 147, 46967, 858, 47629, 1221, 48061, 1246, 48516, 1968, 48738, 2762, 48783, 2918, 49530, 2959, 50068, 4226, 50872, 4878, 51540, 5577, 53972, 33794, 54272, 54503,

我该如何过滤?

编辑:如果有人对 DF 来源感兴趣,请看这里:https://gofile.io/d/6RQvc1

我似乎无法找到直接使用基于矢量匹配的 %in% 运算符过滤 spark 数据帧的答案。所以我不得不通过直接从 SQL 过滤来创建自定义函数看起来像这样:

spark_filter_vector <- function(spark_df_name="", column_name_to_match="", 
                                vector_to_match=c(), log=FALSE, inverse=FALSE){
  string_sql <- ""
  if(inverse){
    string_sql <- paste0("SELECT * FROM ", spark_df_name, " WHERE ",column_name_to_match, " NOT IN(")
  }else{
    string_sql <- paste0("SELECT * FROM ", spark_df_name, " WHERE ",column_name_to_match, " IN(")
  }
  for(a in 1:length(vector_to_match)){
    if(log){
      print(paste0("a = ",a))
    }
    if(a < length(vector_to_match)){
      string_sql <- paste0(string_sql, vector_to_match[a],",")
    }
    else if(a == length(vector_to_match)){
      string_sql <- paste0(string_sql, vector_to_match[a],")")
    }
  }
  sdf_sql(spark_conn, string_sql)
}

这样我就可以像这样实现矢量匹配:

movie_filtered <- spark_filter_vector("movie_rating", "movieId", movie_unique_vector[1:50])
movie_filtered <- spark_filter_vector("movie_rating", "movieId", movie_unique_vector[c(2,7,8,12)])

考虑使用 'join' 而不是 %in%(尽管后者对于 R 用户来说可能更自然)。

  1. 创建包含前 50 部独特电影的 spark 数据框
  2. 然后,inner_join 使用原始 spark 数据帧 (movie_rating)。
movie_unique_sdf = movie_rating %>% 
  select(movieId) %>% 
  distinct() %>% 
  slice(1:50)

movie_rating %>%
  inner_join(movie_unique_sdf, by = "movieId")