如何跨 Array[DataFrame] 组合(加入)信息
How to combine (join) information across an Array[DataFrame]
我有一个 Array[DataFrame],我想检查每个数据框的每一行,按列的值是否有任何变化。假设我有第一行三个数据框,例如:
(0,1.0,0.4,0.1)
(0,3.0,0.2,0.1)
(0,5.0,0.4,0.1)
第一列是 ID,我对这个 ID 的理想输出是:
(0, 1, 1, 0)
表示第二列和第三列发生了变化,而第三列没有变化。
我在这里附上一些数据来复制我的设置
val rdd = sc.parallelize(Array((0,1.0,0.4,0.1),
(1,0.9,0.3,0.3),
(2,0.2,0.9,0.2),
(3,0.9,0.2,0.2),
(4,0.3,0.5,0.5)))
val rdd2 = sc.parallelize(Array((0,3.0,0.2,0.1),
(1,0.9,0.3,0.3),
(2,0.2,0.5,0.2),
(3,0.8,0.1,0.1),
(4,0.3,0.5,0.5)))
val rdd3 = sc.parallelize(Array((0,5.0,0.4,0.1),
(1,0.5,0.3,0.3),
(2,0.3,0.3,0.5),
(3,0.3,0.3,0.1),
(4,0.3,0.5,0.5)))
val df = rdd.toDF("id", "prop1", "prop2", "prop3")
val df2 = rdd2.toDF("id", "prop1", "prop2", "prop3")
val df3 = rdd3.toDF("id", "prop1", "prop2", "prop3")
val result:Array[DataFrame] = new Array[DataFrame](3)
result.update(0, df)
result.update(1,df2)
result.update(2,df3)
如何映射数组并获取输出?
首先我们需要将所有 DataFrames
连接在一起。
val combined = result.reduceLeft((a,b) => a.join(b,"id"))
为了比较同一标签的所有列(例如,"prod1"),我发现在 RDD 级别上操作更容易(至少对我而言)。我们首先将数据转化为(id, Seq[Double])
.
val finalResults = combined.rdd.map{
x =>
(x.getInt(0), x.toSeq.tail.map(_.asInstanceOf[Double]))
}.map{
case(i,d) =>
def checkAllEqual(l: Seq[Double]) = if(l.toSet.size == 1) 0 else 1
val g = d.grouped(3).toList
val g1 = checkAllEqual(g.map(x => x(0)))
val g2 = checkAllEqual(g.map(x => x(1)))
val g3 = checkAllEqual(g.map(x => x(2)))
(i, g1,g2,g3)
}.toDF("id", "prod1", "prod2", "prod3")
finalResults.show()
这将打印:
+---+-----+-----+-----+
| id|prod1|prod2|prod3|
+---+-----+-----+-----+
| 0| 1| 1| 0|
| 1| 1| 0| 0|
| 2| 1| 1| 1|
| 3| 1| 1| 1|
| 4| 0| 0| 0|
+---+-----+-----+-----+
您可以将 countDistinct
与 groupBy
一起使用:
import org.apache.spark.sql.functions.{countDistinct}
val exprs = Seq("prop1", "prop2", "prop3")
.map(c => (countDistinct(c) > 1).cast("integer").alias(c))
val combined = result.reduce(_ unionAll _)
val aggregatedViaGroupBy = combined
.groupBy($"id")
.agg(exprs.head, exprs.tail: _*)
aggregatedViaGroupBy.show
// +---+-----+-----+-----+
// | id|prop1|prop2|prop3|
// +---+-----+-----+-----+
// | 0| 1| 1| 0|
// | 1| 1| 0| 0|
// | 2| 1| 1| 1|
// | 3| 1| 1| 1|
// | 4| 0| 0| 0|
// +---+-----+-----+-----+
我有一个 Array[DataFrame],我想检查每个数据框的每一行,按列的值是否有任何变化。假设我有第一行三个数据框,例如:
(0,1.0,0.4,0.1)
(0,3.0,0.2,0.1)
(0,5.0,0.4,0.1)
第一列是 ID,我对这个 ID 的理想输出是:
(0, 1, 1, 0)
表示第二列和第三列发生了变化,而第三列没有变化。 我在这里附上一些数据来复制我的设置
val rdd = sc.parallelize(Array((0,1.0,0.4,0.1),
(1,0.9,0.3,0.3),
(2,0.2,0.9,0.2),
(3,0.9,0.2,0.2),
(4,0.3,0.5,0.5)))
val rdd2 = sc.parallelize(Array((0,3.0,0.2,0.1),
(1,0.9,0.3,0.3),
(2,0.2,0.5,0.2),
(3,0.8,0.1,0.1),
(4,0.3,0.5,0.5)))
val rdd3 = sc.parallelize(Array((0,5.0,0.4,0.1),
(1,0.5,0.3,0.3),
(2,0.3,0.3,0.5),
(3,0.3,0.3,0.1),
(4,0.3,0.5,0.5)))
val df = rdd.toDF("id", "prop1", "prop2", "prop3")
val df2 = rdd2.toDF("id", "prop1", "prop2", "prop3")
val df3 = rdd3.toDF("id", "prop1", "prop2", "prop3")
val result:Array[DataFrame] = new Array[DataFrame](3)
result.update(0, df)
result.update(1,df2)
result.update(2,df3)
如何映射数组并获取输出?
首先我们需要将所有 DataFrames
连接在一起。
val combined = result.reduceLeft((a,b) => a.join(b,"id"))
为了比较同一标签的所有列(例如,"prod1"),我发现在 RDD 级别上操作更容易(至少对我而言)。我们首先将数据转化为(id, Seq[Double])
.
val finalResults = combined.rdd.map{
x =>
(x.getInt(0), x.toSeq.tail.map(_.asInstanceOf[Double]))
}.map{
case(i,d) =>
def checkAllEqual(l: Seq[Double]) = if(l.toSet.size == 1) 0 else 1
val g = d.grouped(3).toList
val g1 = checkAllEqual(g.map(x => x(0)))
val g2 = checkAllEqual(g.map(x => x(1)))
val g3 = checkAllEqual(g.map(x => x(2)))
(i, g1,g2,g3)
}.toDF("id", "prod1", "prod2", "prod3")
finalResults.show()
这将打印:
+---+-----+-----+-----+
| id|prod1|prod2|prod3|
+---+-----+-----+-----+
| 0| 1| 1| 0|
| 1| 1| 0| 0|
| 2| 1| 1| 1|
| 3| 1| 1| 1|
| 4| 0| 0| 0|
+---+-----+-----+-----+
您可以将 countDistinct
与 groupBy
一起使用:
import org.apache.spark.sql.functions.{countDistinct}
val exprs = Seq("prop1", "prop2", "prop3")
.map(c => (countDistinct(c) > 1).cast("integer").alias(c))
val combined = result.reduce(_ unionAll _)
val aggregatedViaGroupBy = combined
.groupBy($"id")
.agg(exprs.head, exprs.tail: _*)
aggregatedViaGroupBy.show
// +---+-----+-----+-----+
// | id|prop1|prop2|prop3|
// +---+-----+-----+-----+
// | 0| 1| 1| 0|
// | 1| 1| 0| 0|
// | 2| 1| 1| 1|
// | 3| 1| 1| 1|
// | 4| 0| 0| 0|
// +---+-----+-----+-----+