如何根据另一个数据框过滤一个火花数据框
How to filter one spark dataframe against another dataframe
我正在尝试根据另一个数据框过滤一个数据框:
scala> val df1 = sc.parallelize((1 to 100).map(a=>(s"user $a", a*0.123, a))).toDF("name", "score", "user_id")
scala> val df2 = sc.parallelize(List(2,3,4,5,6)).toDF("valid_id")
现在我想过滤 df1 并取回包含 df1 中所有行的数据帧,其中 user_id 在 df2("valid_id") 中。换句话说,我想要 df1 中 user_id 为 2,3,4,5 或 6
的所有行
scala> df1.select("user_id").filter($"user_id" in df2("valid_id"))
warning: there were 1 deprecation warning(s); re-run with -deprecation for details
org.apache.spark.sql.AnalysisException: resolved attribute(s) valid_id#20 missing from user_id#18 in operator !Filter user_id#18 IN (valid_id#20);
另一方面,当我尝试对函数进行筛选时,一切看起来都很棒:
scala> df1.select("user_id").filter(($"user_id" % 2) === 0)
res1: org.apache.spark.sql.DataFrame = [user_id: int]
为什么会出现此错误?我的语法有问题吗?
以下评论我尝试进行左外连接:
scala> df1.show
+-------+------------------+-------+
| name| score|user_id|
+-------+------------------+-------+
| user 1| 0.123| 1|
| user 2| 0.246| 2|
| user 3| 0.369| 3|
| user 4| 0.492| 4|
| user 5| 0.615| 5|
| user 6| 0.738| 6|
| user 7| 0.861| 7|
| user 8| 0.984| 8|
| user 9| 1.107| 9|
|user 10| 1.23| 10|
|user 11| 1.353| 11|
|user 12| 1.476| 12|
|user 13| 1.599| 13|
|user 14| 1.722| 14|
|user 15| 1.845| 15|
|user 16| 1.968| 16|
|user 17| 2.091| 17|
|user 18| 2.214| 18|
|user 19|2.3369999999999997| 19|
|user 20| 2.46| 20|
+-------+------------------+-------+
only showing top 20 rows
scala> df2.show
+--------+
|valid_id|
+--------+
| 2|
| 3|
| 4|
| 5|
| 6|
+--------+
scala> df1.join(df2, df1("user_id") === df2("valid_id"))
res6: org.apache.spark.sql.DataFrame = [name: string, score: double, user_id: int, valid_id: int]
scala> res6.collect
res7: Array[org.apache.spark.sql.Row] = Array()
scala> df1.join(df2, df1("user_id") === df2("valid_id"), "left_outer")
res8: org.apache.spark.sql.DataFrame = [name: string, score: double, user_id: int, valid_id: int]
scala> res8.count
res9: Long = 0
我是 运行 spark 1.5.0 和 scala 2.10.5
您需要(常规)内部联接,而不是外部联接:)
df1.join(df2, df1("user_id") === df2("valid_id"))
你也可以这样写代码
加入 INNER、LEFT_OUTER、RIGHT_OUTER 等类型
df1.join(df2, col("user_id") === col("valid_id"), "${type_of_join}")
我正在尝试根据另一个数据框过滤一个数据框:
scala> val df1 = sc.parallelize((1 to 100).map(a=>(s"user $a", a*0.123, a))).toDF("name", "score", "user_id")
scala> val df2 = sc.parallelize(List(2,3,4,5,6)).toDF("valid_id")
现在我想过滤 df1 并取回包含 df1 中所有行的数据帧,其中 user_id 在 df2("valid_id") 中。换句话说,我想要 df1 中 user_id 为 2,3,4,5 或 6
的所有行scala> df1.select("user_id").filter($"user_id" in df2("valid_id"))
warning: there were 1 deprecation warning(s); re-run with -deprecation for details
org.apache.spark.sql.AnalysisException: resolved attribute(s) valid_id#20 missing from user_id#18 in operator !Filter user_id#18 IN (valid_id#20);
另一方面,当我尝试对函数进行筛选时,一切看起来都很棒:
scala> df1.select("user_id").filter(($"user_id" % 2) === 0)
res1: org.apache.spark.sql.DataFrame = [user_id: int]
为什么会出现此错误?我的语法有问题吗?
以下评论我尝试进行左外连接:
scala> df1.show
+-------+------------------+-------+
| name| score|user_id|
+-------+------------------+-------+
| user 1| 0.123| 1|
| user 2| 0.246| 2|
| user 3| 0.369| 3|
| user 4| 0.492| 4|
| user 5| 0.615| 5|
| user 6| 0.738| 6|
| user 7| 0.861| 7|
| user 8| 0.984| 8|
| user 9| 1.107| 9|
|user 10| 1.23| 10|
|user 11| 1.353| 11|
|user 12| 1.476| 12|
|user 13| 1.599| 13|
|user 14| 1.722| 14|
|user 15| 1.845| 15|
|user 16| 1.968| 16|
|user 17| 2.091| 17|
|user 18| 2.214| 18|
|user 19|2.3369999999999997| 19|
|user 20| 2.46| 20|
+-------+------------------+-------+
only showing top 20 rows
scala> df2.show
+--------+
|valid_id|
+--------+
| 2|
| 3|
| 4|
| 5|
| 6|
+--------+
scala> df1.join(df2, df1("user_id") === df2("valid_id"))
res6: org.apache.spark.sql.DataFrame = [name: string, score: double, user_id: int, valid_id: int]
scala> res6.collect
res7: Array[org.apache.spark.sql.Row] = Array()
scala> df1.join(df2, df1("user_id") === df2("valid_id"), "left_outer")
res8: org.apache.spark.sql.DataFrame = [name: string, score: double, user_id: int, valid_id: int]
scala> res8.count
res9: Long = 0
我是 运行 spark 1.5.0 和 scala 2.10.5
您需要(常规)内部联接,而不是外部联接:)
df1.join(df2, df1("user_id") === df2("valid_id"))
你也可以这样写代码
加入 INNER、LEFT_OUTER、RIGHT_OUTER 等类型
df1.join(df2, col("user_id") === col("valid_id"), "${type_of_join}")