Spark sql:如何过滤数据帧两次然后连接在一起?
Spark sql: How do I filter a dataframe twice and then join together?
我正在尝试过滤和连接以执行简单的数据透视,但得到的结果非常奇怪。
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
val people = Array((1, "sam"), (2, "joe"), (3, "sally"), (4, "joanna"))
val accounts = Array(
(1, "checking", 100.0),
(1, "savings", 300.0),
(2, "savings", 1000.0),
(3, "carloan", 12000.0),
(3, "checking", 400.0)
)
val t1 = sc.makeRDD(people).toDF("uid", "name")
val t2 = sc.makeRDD(accounts).toDF("uid", "type", "amount")
val t2c = t2.filter(t2("type") <=> "checking")
val t2s = t2.filter(t2("type") <=> "savings")
t1.
join(t2c, t1("uid") <=> t2c("uid"), "left").
join(t2s, t1("uid") <=> t2s("uid"), "left").
take(10)
结果错误:
Array(
[1,sam,1,checking,100.0,1,savings,300.0],
[1,sam,1,checking,100.0,2,savings,1000.0],
[2,joe,null,null,null,null,null,null],
[3,sally,3,checking,400.0,1,savings,300.0],
[3,sally,3,checking,400.0,2,savings,1000.0],
[4,joanna,null,null,null,null,null,null]
)
我可以强制它正常工作的方法是为每个过滤器创建一个新的 df:
val t2a = sc.makeRDD(accounts).toDF("uid", "type", "amount")
val t2s = t2a.filter(t2a("type") <=> "savings")
t1.
join(t2c, t1("uid") <=> t2c("uid"), "left").
join(t2s, t1("uid") <=> t2s("uid"), "left").
take(10)
结果正确:
Array(
[1,sam,1,checking,100.0,1,savings,300.0],
[2,joe,null,null,null,2,savings,1000.0],
[3,sally,3,checking,400.0,null,null,null],
[4,joanna,null,null,null,null,null,null]
)
这个解决方案不可行,有没有更好的方法?
我将此作为错误提交:
https://issues.apache.org/jira/browse/SPARK-15063
解决办法是有一种执行连接的替代方法:
t1.
join(t2c, Seq("uid"), "left").
join(t2s, Seq("uid"), "left").
take(10)
可能存在不允许原始表单工作的错误。
我正在尝试过滤和连接以执行简单的数据透视,但得到的结果非常奇怪。
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
val people = Array((1, "sam"), (2, "joe"), (3, "sally"), (4, "joanna"))
val accounts = Array(
(1, "checking", 100.0),
(1, "savings", 300.0),
(2, "savings", 1000.0),
(3, "carloan", 12000.0),
(3, "checking", 400.0)
)
val t1 = sc.makeRDD(people).toDF("uid", "name")
val t2 = sc.makeRDD(accounts).toDF("uid", "type", "amount")
val t2c = t2.filter(t2("type") <=> "checking")
val t2s = t2.filter(t2("type") <=> "savings")
t1.
join(t2c, t1("uid") <=> t2c("uid"), "left").
join(t2s, t1("uid") <=> t2s("uid"), "left").
take(10)
结果错误:
Array(
[1,sam,1,checking,100.0,1,savings,300.0],
[1,sam,1,checking,100.0,2,savings,1000.0],
[2,joe,null,null,null,null,null,null],
[3,sally,3,checking,400.0,1,savings,300.0],
[3,sally,3,checking,400.0,2,savings,1000.0],
[4,joanna,null,null,null,null,null,null]
)
我可以强制它正常工作的方法是为每个过滤器创建一个新的 df:
val t2a = sc.makeRDD(accounts).toDF("uid", "type", "amount")
val t2s = t2a.filter(t2a("type") <=> "savings")
t1.
join(t2c, t1("uid") <=> t2c("uid"), "left").
join(t2s, t1("uid") <=> t2s("uid"), "left").
take(10)
结果正确:
Array(
[1,sam,1,checking,100.0,1,savings,300.0],
[2,joe,null,null,null,2,savings,1000.0],
[3,sally,3,checking,400.0,null,null,null],
[4,joanna,null,null,null,null,null,null]
)
这个解决方案不可行,有没有更好的方法?
我将此作为错误提交: https://issues.apache.org/jira/browse/SPARK-15063
解决办法是有一种执行连接的替代方法:
t1.
join(t2c, Seq("uid"), "left").
join(t2s, Seq("uid"), "left").
take(10)
可能存在不允许原始表单工作的错误。