过滤 DataFrame 最有效的方法是什么

What's the most efficient way to filter a DataFrame

...通过检查列的值是否在 seq.
中 可能我解释得不是很好,我主要是想要这个(用正则SQL来表达):DF_Column IN seq?

首先,我使用 broadcast var(我放置序列的位置)、UDF(进行检查)和 registerTempTable.
问题是我没有测试它,因为我 运行 进入 known bug 显然只在 registerTempTableScalaIDE 一起使用时出现。

我最终从 seq 中创建了一个新的 DataFrame 并对其进行内部连接(交集),但我怀疑这是完成任务的最佳方式。

谢谢

编辑:(回复@YijieShen):
如何根据一个 DataFrame 的列的元素是否在另一个 DF 的列(如 SQL select * from A where login in (select username from B))来执行 filter

例如: 第一个 DF:

login      count
login1     192  
login2     146  
login3     72   

第二个 DF:

username
login2
login3
login4

结果:

login      count
login2     146  
login3     72   

尝试次数:
EDIT-2: 我认为,既然错误已修复,这些应该可以工作。 结束 EDIT-2

ordered.select("login").filter($"login".contains(empLogins("username")))

ordered.select("login").filter($"login" in empLogins("username"))

两者都抛出 Exception in thread "main" org.apache.spark.sql.AnalysisException,分别为:

resolved attribute(s) username#10 missing from login#8 in operator 
!Filter Contains(login#8, username#10);

resolved attribute(s) username#10 missing from login#8 in operator 
!Filter login#8 IN (username#10);

我的代码(按照你第一个方法的描述)Spark 1.4.0-SNAPSHOT 这两个配置下正常运行:

  • Intellij IDEA's test
  • Spark Standalone cluster 有 8 个节点(1 个主节点,7 个工作节点)

请检查是否存在差异

val bc = sc.broadcast(Array[String]("login3", "login4"))
val x = Array(("login1", 192), ("login2", 146), ("login3", 72))
val xdf = sqlContext.createDataFrame(x).toDF("name", "cnt")

val func: (String => Boolean) = (arg: String) => bc.value.contains(arg)
val sqlfunc = udf(func)
val filtered = xdf.filter(sqlfunc(col("name")))

xdf.show()
filtered.show()

输出

name cnt
login1 192
login2 146
login3 72

name cnt
login3 72

  1. 您应该广播 Set,而不是 Array,比线性搜索快得多。

  2. 您可以使 Eclipse 运行 成为您的 Spark 应用程序。方法如下:

正如邮件列表中指出的那样,spark-sql 假定其 类 由原始类加载器加载。在 Eclipse 中情况并非如此,如果 Java 和 Scala 库作为引导类路径的一部分加载,而用户代码及其依赖项位于另一个类路径中。您可以在启动配置对话框中轻松修复该问题:

  • 从 "Bootstrap" 条目中删除 Scala 库和 Scala 编译器
  • 将(作为外部 jar)scala-reflectscala-libraryscala-compiler 添加到用户条目。

对话框应如下所示:

Edit: The Spark bug was fixed and this workaround is no longer necessary (since v. 1.4.0)