Spark 2.0 DataSets groupByKey 和 divide 操作和类型安全
Spark 2.0 DataSets groupByKey and divide operation and type safety
我对 Spark 2.0 DataSets 非常满意,因为它具有编译时类型安全性。但是这里有几个我无法解决的问题,我也没有找到好的文档。
问题 #1 - 聚合列的除法运算 -
考虑下面的代码 -
我有一个 DataSet[MyCaseClass],我想在 c1、c2、c3 和 sum(c4) / 8 上进行 groupByKey。如果我只计算总和,下面的代码运行良好,但它给出了 divide(8) 的编译时错误。我想知道如何才能实现以下目标。
final case class MyClass (c1: String,
c2: String,
c3: String,
c4: Double)
val myCaseClass: DataSet[MyCaseClass] = ??? // assume it's being loaded
import sparkSession.implicits._
import org.apache.spark.sql.expressions.scalalang.typed.{sum => typedSum}
myCaseClass.
groupByKey(myCaseClass =>
(myCaseClass.c1, myCaseClass.c2, myCaseClass.c3)).
agg(typedSum[MyCaseClass](_.c4).name("sum(c4)").
divide(8)). //this is breaking with exception
show()
如果我删除 .divide(8) 操作和 运行 以上命令,它会给我以下输出。
+-----------+-------------+
| key|sum(c4) |
+-----------+-------------+
| [A1,F2,S1]| 80.0|
| [A1,F1,S1]| 40.0|
+-----------+-------------+
问题 #2 - 将 groupedByKey 结果转换为另一个 Typed DataFrame -
现在我的问题的第二部分是我想再次输出一个类型化的数据集。为此,我有另一个案例 class(不确定是否需要),但我不确定如何使用分组结果进行映射 -
final case class AnotherClass(c1: String,
c2: String,
c3: String,
average: Double)
myCaseClass.
groupByKey(myCaseClass =>
(myCaseClass.c1, myCaseClass.c2, myCaseClass.c3)).
agg(typedSum[MyCaseClass](_.c4).name("sum(c4)")).
as[AnotherClass] //this is breaking with exception
但这再次失败并出现异常,因为按键结果分组没有直接映射到 AnotherClass。
PS :非常欢迎任何其他实现上述目标的解决方案。
第一个问题可以通过一直使用类型化列来解决(KeyValueGroupedDataset.agg
预计 TypedColumn(-s)
)
您可以将聚合结果定义为:
val eight = lit(8.0)
.as[Double] // Not necessary
val sumByEight = typedSum[MyClass](_.c4)
.divide(eight)
.as[Double] // Required
.name("div(sum(c4), 8)")
并将其插入以下代码:
val myCaseClass = Seq(
MyClass("a", "b", "c", 2.0),
MyClass("a", "b", "c", 3.0)
).toDS
myCaseClass
.groupByKey(myCaseClass => (myCaseClass.c1, myCaseClass.c2, myCaseClass.c3))
.agg(sumByEight)
得到
+-------+---------------+
| key|div(sum(c4), 8)|
+-------+---------------+
|[a,b,c]| 0.625|
+-------+---------------+
第二个问题是使用不符合数据形状的 class 的结果。正确的表示可能是:
case class AnotherClass(key: (String, String, String), sum: Double)
与上面定义的数据一起使用:
myCaseClass
.groupByKey(myCaseClass => (myCaseClass.c1, myCaseClass.c2, myCaseClass.c3))
.agg(typedSum[MyClass](_.c4).name("sum"))
.as[AnotherClass]
会给出:
+-------+---+
| key|sum|
+-------+---+
|[a,b,c]|5.0|
+-------+---+
但是如果Dataset[((String, String, String), Double)]
可以接受,这里就不需要.as[AnotherClass]
了。
你当然可以跳过所有这些,只需要mapGroups
(尽管并非没有性能损失):
import shapeless.syntax.std.tuple._ // A little bit of shapeless
val tuples = myCaseClass
.groupByKey(myCaseClass => (myCaseClass.c1, myCaseClass.c2, myCaseClass.c3))
.mapGroups((group, iter) => group :+ iter.map(_.c4).sum)
结果
+---+---+---+---+
| _1| _2| _3| _4|
+---+---+---+---+
| a| b| c|5.0|
+---+---+---+---+
reduceGroups
可能是更好的选择:
myCaseClass
.groupByKey(myCaseClass => (myCaseClass.c1, myCaseClass.c2, myCaseClass.c3))
.reduceGroups((x, y) => x.copy(c4=x.c4 + y.c4))
结果 Dataset
:
+-------+-----------+
| _1| _2|
+-------+-----------+
|[a,b,c]|[a,b,c,5.0]|
+-------+-----------+
我对 Spark 2.0 DataSets 非常满意,因为它具有编译时类型安全性。但是这里有几个我无法解决的问题,我也没有找到好的文档。
问题 #1 - 聚合列的除法运算 - 考虑下面的代码 - 我有一个 DataSet[MyCaseClass],我想在 c1、c2、c3 和 sum(c4) / 8 上进行 groupByKey。如果我只计算总和,下面的代码运行良好,但它给出了 divide(8) 的编译时错误。我想知道如何才能实现以下目标。
final case class MyClass (c1: String,
c2: String,
c3: String,
c4: Double)
val myCaseClass: DataSet[MyCaseClass] = ??? // assume it's being loaded
import sparkSession.implicits._
import org.apache.spark.sql.expressions.scalalang.typed.{sum => typedSum}
myCaseClass.
groupByKey(myCaseClass =>
(myCaseClass.c1, myCaseClass.c2, myCaseClass.c3)).
agg(typedSum[MyCaseClass](_.c4).name("sum(c4)").
divide(8)). //this is breaking with exception
show()
如果我删除 .divide(8) 操作和 运行 以上命令,它会给我以下输出。
+-----------+-------------+
| key|sum(c4) |
+-----------+-------------+
| [A1,F2,S1]| 80.0|
| [A1,F1,S1]| 40.0|
+-----------+-------------+
问题 #2 - 将 groupedByKey 结果转换为另一个 Typed DataFrame - 现在我的问题的第二部分是我想再次输出一个类型化的数据集。为此,我有另一个案例 class(不确定是否需要),但我不确定如何使用分组结果进行映射 -
final case class AnotherClass(c1: String,
c2: String,
c3: String,
average: Double)
myCaseClass.
groupByKey(myCaseClass =>
(myCaseClass.c1, myCaseClass.c2, myCaseClass.c3)).
agg(typedSum[MyCaseClass](_.c4).name("sum(c4)")).
as[AnotherClass] //this is breaking with exception
但这再次失败并出现异常,因为按键结果分组没有直接映射到 AnotherClass。
PS :非常欢迎任何其他实现上述目标的解决方案。
第一个问题可以通过一直使用类型化列来解决(KeyValueGroupedDataset.agg
预计 TypedColumn(-s)
)
您可以将聚合结果定义为:
val eight = lit(8.0)
.as[Double] // Not necessary
val sumByEight = typedSum[MyClass](_.c4)
.divide(eight)
.as[Double] // Required
.name("div(sum(c4), 8)")
并将其插入以下代码:
val myCaseClass = Seq(
MyClass("a", "b", "c", 2.0),
MyClass("a", "b", "c", 3.0)
).toDS
myCaseClass
.groupByKey(myCaseClass => (myCaseClass.c1, myCaseClass.c2, myCaseClass.c3))
.agg(sumByEight)
得到
+-------+---------------+
| key|div(sum(c4), 8)|
+-------+---------------+
|[a,b,c]| 0.625|
+-------+---------------+
第二个问题是使用不符合数据形状的 class 的结果。正确的表示可能是:
case class AnotherClass(key: (String, String, String), sum: Double)
与上面定义的数据一起使用:
myCaseClass
.groupByKey(myCaseClass => (myCaseClass.c1, myCaseClass.c2, myCaseClass.c3))
.agg(typedSum[MyClass](_.c4).name("sum"))
.as[AnotherClass]
会给出:
+-------+---+
| key|sum|
+-------+---+
|[a,b,c]|5.0|
+-------+---+
但是如果Dataset[((String, String, String), Double)]
可以接受,这里就不需要.as[AnotherClass]
了。
你当然可以跳过所有这些,只需要mapGroups
(尽管并非没有性能损失):
import shapeless.syntax.std.tuple._ // A little bit of shapeless
val tuples = myCaseClass
.groupByKey(myCaseClass => (myCaseClass.c1, myCaseClass.c2, myCaseClass.c3))
.mapGroups((group, iter) => group :+ iter.map(_.c4).sum)
结果
+---+---+---+---+
| _1| _2| _3| _4|
+---+---+---+---+
| a| b| c|5.0|
+---+---+---+---+
reduceGroups
可能是更好的选择:
myCaseClass
.groupByKey(myCaseClass => (myCaseClass.c1, myCaseClass.c2, myCaseClass.c3))
.reduceGroups((x, y) => x.copy(c4=x.c4 + y.c4))
结果 Dataset
:
+-------+-----------+
| _1| _2|
+-------+-----------+
|[a,b,c]|[a,b,c,5.0]|
+-------+-----------+