过滤 DataFrame 最有效的方法是什么
What's the most efficient way to filter a DataFrame
...通过检查列的值是否在 seq
.
中
可能我解释得不是很好,我主要是想要这个(用正则SQL来表达):DF_Column IN seq
?
首先,我使用 broadcast var
(我放置序列的位置)、UDF
(进行检查)和 registerTempTable
.
问题是我没有测试它,因为我 运行 进入 known bug 显然只在 registerTempTable
与 ScalaIDE 一起使用时出现。
我最终从 seq
中创建了一个新的 DataFrame
并对其进行内部连接(交集),但我怀疑这是完成任务的最佳方式。
谢谢
编辑:(回复@YijieShen):
如何根据一个 DataFrame
的列的元素是否在另一个 DF 的列(如 SQL select * from A where login in (select username from B)
)来执行 filter
?
例如:
第一个 DF:
login count
login1 192
login2 146
login3 72
第二个 DF:
username
login2
login3
login4
结果:
login count
login2 146
login3 72
尝试次数:
EDIT-2: 我认为,既然错误已修复,这些应该可以工作。 结束 EDIT-2
ordered.select("login").filter($"login".contains(empLogins("username")))
和
ordered.select("login").filter($"login" in empLogins("username"))
两者都抛出 Exception in thread "main" org.apache.spark.sql.AnalysisException
,分别为:
resolved attribute(s) username#10 missing from login#8 in operator
!Filter Contains(login#8, username#10);
和
resolved attribute(s) username#10 missing from login#8 in operator
!Filter login#8 IN (username#10);
我的代码(按照你第一个方法的描述) 在Spark 1.4.0-SNAPSHOT
这两个配置下正常运行:
Intellij IDEA's test
Spark Standalone cluster
有 8 个节点(1 个主节点,7 个工作节点)
请检查是否存在差异
val bc = sc.broadcast(Array[String]("login3", "login4"))
val x = Array(("login1", 192), ("login2", 146), ("login3", 72))
val xdf = sqlContext.createDataFrame(x).toDF("name", "cnt")
val func: (String => Boolean) = (arg: String) => bc.value.contains(arg)
val sqlfunc = udf(func)
val filtered = xdf.filter(sqlfunc(col("name")))
xdf.show()
filtered.show()
输出
name cnt
login1 192
login2 146
login3 72
name cnt
login3 72
您应该广播 Set
,而不是 Array
,比线性搜索快得多。
您可以使 Eclipse 运行 成为您的 Spark 应用程序。方法如下:
正如邮件列表中指出的那样,spark-sql 假定其 类 由原始类加载器加载。在 Eclipse 中情况并非如此,如果 Java 和 Scala 库作为引导类路径的一部分加载,而用户代码及其依赖项位于另一个类路径中。您可以在启动配置对话框中轻松修复该问题:
- 从 "Bootstrap" 条目中删除 Scala 库和 Scala 编译器
- 将(作为外部 jar)
scala-reflect
、scala-library
和 scala-compiler
添加到用户条目。
对话框应如下所示:
Edit: The Spark bug was fixed and this workaround is no longer necessary (since v. 1.4.0)
...通过检查列的值是否在 seq
.
中
可能我解释得不是很好,我主要是想要这个(用正则SQL来表达):DF_Column IN seq
?
首先,我使用 broadcast var
(我放置序列的位置)、UDF
(进行检查)和 registerTempTable
.
问题是我没有测试它,因为我 运行 进入 known bug 显然只在 registerTempTable
与 ScalaIDE 一起使用时出现。
我最终从 seq
中创建了一个新的 DataFrame
并对其进行内部连接(交集),但我怀疑这是完成任务的最佳方式。
谢谢
编辑:(回复@YijieShen):
如何根据一个 DataFrame
的列的元素是否在另一个 DF 的列(如 SQL select * from A where login in (select username from B)
)来执行 filter
?
例如: 第一个 DF:
login count
login1 192
login2 146
login3 72
第二个 DF:
username
login2
login3
login4
结果:
login count
login2 146
login3 72
尝试次数:
EDIT-2: 我认为,既然错误已修复,这些应该可以工作。 结束 EDIT-2
ordered.select("login").filter($"login".contains(empLogins("username")))
和
ordered.select("login").filter($"login" in empLogins("username"))
两者都抛出 Exception in thread "main" org.apache.spark.sql.AnalysisException
,分别为:
resolved attribute(s) username#10 missing from login#8 in operator
!Filter Contains(login#8, username#10);
和
resolved attribute(s) username#10 missing from login#8 in operator
!Filter login#8 IN (username#10);
我的代码(按照你第一个方法的描述) 在Spark 1.4.0-SNAPSHOT
这两个配置下正常运行:
Intellij IDEA's test
Spark Standalone cluster
有 8 个节点(1 个主节点,7 个工作节点)
请检查是否存在差异
val bc = sc.broadcast(Array[String]("login3", "login4"))
val x = Array(("login1", 192), ("login2", 146), ("login3", 72))
val xdf = sqlContext.createDataFrame(x).toDF("name", "cnt")
val func: (String => Boolean) = (arg: String) => bc.value.contains(arg)
val sqlfunc = udf(func)
val filtered = xdf.filter(sqlfunc(col("name")))
xdf.show()
filtered.show()
输出
name cnt
login1 192
login2 146
login3 72name cnt
login3 72
您应该广播
Set
,而不是Array
,比线性搜索快得多。您可以使 Eclipse 运行 成为您的 Spark 应用程序。方法如下:
正如邮件列表中指出的那样,spark-sql 假定其 类 由原始类加载器加载。在 Eclipse 中情况并非如此,如果 Java 和 Scala 库作为引导类路径的一部分加载,而用户代码及其依赖项位于另一个类路径中。您可以在启动配置对话框中轻松修复该问题:
- 从 "Bootstrap" 条目中删除 Scala 库和 Scala 编译器
- 将(作为外部 jar)
scala-reflect
、scala-library
和scala-compiler
添加到用户条目。
对话框应如下所示:
Edit: The Spark bug was fixed and this workaround is no longer necessary (since v. 1.4.0)