加入这两个 Spark DataFrame 的正确方法是什么?
What is the right way to join these 2 Spark DataFrames?
假设我有 2 个 spark DataFrame:
val addStuffDf = Seq(
("A", "2018-03-22", 5),
("A", "2018-03-24", 1),
("B", "2018-03-24, 3))
.toDF("user", "dt", "count")
val removedStuffDf = Seq(
("C", "2018-03-25", 10),
("A", "2018-03-24", 5),
("B", "2018-03-25", 1)
).toDF("user", "dt", "count")
最后我想得到一个包含这样的汇总统计信息的数据框(实际上顺序并不重要):
+----+----------+-----+-------+
|user| dt|added|removed|
+----+----------+-----+-------+
| A|2018-03-22| 5| 0|
| A|2018-03-24| 1| 5|
| B|2018-03-24| 3| 0|
| B|2018-03-25| 0| 1|
| C|2018-03-25| 0| 10|
+----+----------+-----+-------+
很明显,我可以简单地重命名 "step 0" 处的 "count" 列,以便拥有数据帧 df1
和 df2
val df1 = addedDf.withColumnRenamed("count", "added")
df1.show()
+----+----------+-----+
|user| dt|added|
+----+----------+-----+
| A|2018-03-22| 5|
| A|2018-03-24| 1|
| B|2018-03-24| 3|
+----+----------+-----+
val df2 = removedDf.withColumnRenamed("count", "removed")
df2.show()
+----+----------+-------+
|user| dt|applied|
+----+----------+-------+
| C|2018-03-25| 10|
| A|2018-03-24| 5|
| B|2018-03-25| 1|
+----+----------+-------+
但现在我无法定义 "step 1" - 即无法确定将 df1 和 df2 压缩在一起的转换。
从逻辑的角度来看 full_outer
join 将我需要的所有行都放在一个 DF 中,但是我需要以某种方式合并重复的列:
df1.as('d1)
.join(df2.as('d2),
($"d1.user"===$"d2.user" && $"d1.dt"===$"d2.dt"),
"full_outer")
.show()
+----+----------+-----+----+----------+-------+
|user| dt|added|user| dt|applied|
+----+----------+-----+----+----------+-------+
|null| null| null| C|2018-03-25| 10|
|null| null| null| B|2018-03-25| 1|
| B|2018-03-24| 3|null| null| null|
| A|2018-03-22| 5|null| null| null|
| A|2018-03-24| 1| A|2018-03-24| 5|
+----+----------+-----+----+----------+-------+
如何将这些 user
和 dt
列合并在一起?而且,总的来说 - 我是在使用正确的方法来解决我的问题,还是有更多 straightforward/efficient 的解决方案?
由于要连接的两个 DataFrame 的列具有匹配的名称,因此使用 Seq("user", "dt")
作为连接条件将导致您想要的合并 table:
val addStuffDf = Seq(
("A", "2018-03-22", 5),
("A", "2018-03-24", 1),
("B", "2018-03-24", 3)
).toDF("user", "dt", "count")
val removedStuffDf = Seq(
("C", "2018-03-25", 10),
("A", "2018-03-24", 5),
("B", "2018-03-25", 1)
).toDF("user", "dt", "count")
val df1 = addStuffDf.withColumnRenamed("count", "added")
val df2 = removedStuffDf.withColumnRenamed("count", "removed")
df1.as('d1).join(df2.as('d2), Seq("user", "dt"), "full_outer").
na.fill(0).
show
// +----+----------+-----+-------+
// |user| dt|added|removed|
// +----+----------+-----+-------+
// | C|2018-03-25| 0| 10|
// | B|2018-03-25| 0| 1|
// | B|2018-03-24| 3| 0|
// | A|2018-03-22| 5| 0|
// | A|2018-03-24| 1| 5|
// +----+----------+-----+-------+
假设我有 2 个 spark DataFrame:
val addStuffDf = Seq(
("A", "2018-03-22", 5),
("A", "2018-03-24", 1),
("B", "2018-03-24, 3))
.toDF("user", "dt", "count")
val removedStuffDf = Seq(
("C", "2018-03-25", 10),
("A", "2018-03-24", 5),
("B", "2018-03-25", 1)
).toDF("user", "dt", "count")
最后我想得到一个包含这样的汇总统计信息的数据框(实际上顺序并不重要):
+----+----------+-----+-------+
|user| dt|added|removed|
+----+----------+-----+-------+
| A|2018-03-22| 5| 0|
| A|2018-03-24| 1| 5|
| B|2018-03-24| 3| 0|
| B|2018-03-25| 0| 1|
| C|2018-03-25| 0| 10|
+----+----------+-----+-------+
很明显,我可以简单地重命名 "step 0" 处的 "count" 列,以便拥有数据帧 df1
和 df2
val df1 = addedDf.withColumnRenamed("count", "added")
df1.show()
+----+----------+-----+
|user| dt|added|
+----+----------+-----+
| A|2018-03-22| 5|
| A|2018-03-24| 1|
| B|2018-03-24| 3|
+----+----------+-----+
val df2 = removedDf.withColumnRenamed("count", "removed")
df2.show()
+----+----------+-------+
|user| dt|applied|
+----+----------+-------+
| C|2018-03-25| 10|
| A|2018-03-24| 5|
| B|2018-03-25| 1|
+----+----------+-------+
但现在我无法定义 "step 1" - 即无法确定将 df1 和 df2 压缩在一起的转换。
从逻辑的角度来看 full_outer
join 将我需要的所有行都放在一个 DF 中,但是我需要以某种方式合并重复的列:
df1.as('d1)
.join(df2.as('d2),
($"d1.user"===$"d2.user" && $"d1.dt"===$"d2.dt"),
"full_outer")
.show()
+----+----------+-----+----+----------+-------+
|user| dt|added|user| dt|applied|
+----+----------+-----+----+----------+-------+
|null| null| null| C|2018-03-25| 10|
|null| null| null| B|2018-03-25| 1|
| B|2018-03-24| 3|null| null| null|
| A|2018-03-22| 5|null| null| null|
| A|2018-03-24| 1| A|2018-03-24| 5|
+----+----------+-----+----+----------+-------+
如何将这些 user
和 dt
列合并在一起?而且,总的来说 - 我是在使用正确的方法来解决我的问题,还是有更多 straightforward/efficient 的解决方案?
由于要连接的两个 DataFrame 的列具有匹配的名称,因此使用 Seq("user", "dt")
作为连接条件将导致您想要的合并 table:
val addStuffDf = Seq(
("A", "2018-03-22", 5),
("A", "2018-03-24", 1),
("B", "2018-03-24", 3)
).toDF("user", "dt", "count")
val removedStuffDf = Seq(
("C", "2018-03-25", 10),
("A", "2018-03-24", 5),
("B", "2018-03-25", 1)
).toDF("user", "dt", "count")
val df1 = addStuffDf.withColumnRenamed("count", "added")
val df2 = removedStuffDf.withColumnRenamed("count", "removed")
df1.as('d1).join(df2.as('d2), Seq("user", "dt"), "full_outer").
na.fill(0).
show
// +----+----------+-----+-------+
// |user| dt|added|removed|
// +----+----------+-----+-------+
// | C|2018-03-25| 0| 10|
// | B|2018-03-25| 0| 1|
// | B|2018-03-24| 3| 0|
// | A|2018-03-22| 5| 0|
// | A|2018-03-24| 1| 5|
// +----+----------+-----+-------+