GroupBy + 数据集上的自定义聚合,其中 Case class / Trait in the Key
GroupBy + custom aggregation on Dataset with Case class / Trait in the Key
我正在尝试重构一些代码并将通用逻辑放入特征中。我基本上想处理数据集,按某个键对它们进行分组并聚合:
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.{ Dataset, Encoder, Encoders, TypedColumn }
case class SomeKey(a: String, b: Boolean)
case class InputRow(
SomeKey,
v: Double
)
trait MyTrait {
def processInputs: Dataset[InputRow]
def groupAndAggregate(
logs: Dataset[InputRow]
): Dataset[(SomeKey, Long)] = {
import logs.sparkSession.implicits._
logs
.groupByKey(i => i.key)
.agg(someAggFunc)
}
//Whatever agg function: here, it counts the number of v that are >= 0.5
def someAggFunc: TypedColumn[InputRow, Long] =
new Aggregator[
/*input type*/ InputRow,
/* "buffer" type */ Long,
/* output type */ Long
] with Serializable {
def zero = 0L
def reduce(b: Long, a: InputRow) = {
if (a.v >= 0.5)
b + 1
else
b
}
def merge(b1: Long, b2: Long) =
b1 + b2
// map buffer to output type
def finish(b: Long) = b
def bufferEncoder: Encoder[Long] = Encoders.scalaLong
def outputEncoder: Encoder[Long] = Encoders.scalaLong
}.toColumn
}
一切正常:我可以实例化一个继承自 MyTrait 的 class 并覆盖我处理输入的方式:
import spark.implicits._
case class MyTraitTest(testDf: DataFrame) extends MyTrait {
override def processInputs: Dataset[InputRow] = {
val ds = testDf
.select(
$"a",
$"b",
$"v",
)
.rdd
.map(
r =>
InputRow(
SomeKey(r.getAs[String]("a"), r.getAs[Boolean]("b")),
r.getAs[Double]("v")
)
)
.toDS
ds
}
val df: DataFrame = Seq(
("1", false, 0.40),
("1", false, 0.54),
("0", true, 0.85),
("1", true, 0.39)
).toDF("a", "b", "v")
val myTraitTest = MyTraitTest(df)
val ds: Dataset[InputRow] = myTraitTest.processInputs
val res = myTraitTest.groupAndAggregate(ds)
res.show(false)
+----------+----------------------------------+
|key |InputRow |
+----------+----------------------------------+
|[1, false]|1 |
|[0, true] |1 |
|[1, true] |0 |
+----------+----------------------------------+
现在的问题是:我希望 SomeKey 派生自更通用的特征键,因为键不会总是只有两个字段,字段不会具有相同的类型等。它将始终是一个简单的元组一些基本的原始类型。
所以我尝试执行以下操作:
trait Key extends Product
case class SomeKey(a: String, b: Boolean) extends Key
case class SomeOtherKey(x: Int, y: Boolean, z: String) extends Key
case class InputRow[T <: Key](
key: T,
v: Double
)
trait MyTrait[T <: Key] {
def processInputs: Dataset[InputRow[T]]
def groupAndAggregate(
logs: Dataset[InputRow[T]]
): Dataset[(T, Long)] = {
import logs.sparkSession.implicits._
logs
.groupByKey(i => i.key)
.agg(someAggFunc)
}
def someAggFunc: TypedColumn[InputRow[T], Long] = {...}
我现在做:
case class MyTraitTest(testDf: DataFrame) extends MyTrait[SomeKey] {
override def processInputs: Dataset[InputRow[SomeKey]] = {
...
}
等等
但现在我得到错误:Unable to find encoder for type T. An implicit Encoder[T] is needed to store T instances in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases.
.groupByKey(i => i.key)
我真的不知道如何解决这个问题,我尝试了很多方法都没有成功。很抱歉这么长的描述,但希望你有所有的元素来帮助我理解......谢谢!
Spark 需要能够为产品类型 T 隐式创建编码器,因此您需要帮助它绕过 JVM 类型擦除并将 T 的 TypeTag 作为 groupAndAggregate 方法的隐式参数传递。
一个工作示例:
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.{ DataFrame, Dataset, Encoders, TypedColumn }
import scala.reflect.runtime.universe.TypeTag
trait Key extends Product
case class SomeKey(a: String, b: Boolean) extends Key
case class SomeOtherKey(x: Int, y: Boolean, z: String) extends Key
case class InputRow[T <: Key](key: T, v: Double)
trait MyTrait[T <: Key] {
def processInputs: Dataset[InputRow[T]]
def groupAndAggregate(
logs: Dataset[InputRow[T]]
)(implicit tTypeTag: TypeTag[T]): Dataset[(T, Long)] = {
import logs.sparkSession.implicits._
logs
.groupByKey(i => i.key)
.agg(someAggFunc)
}
def someAggFunc: TypedColumn[InputRow[T], Long] =
new Aggregator[InputRow[T], Long, Long] with Serializable {
def reduce(b: Long, a: InputRow[T]) = b + (a.v * 100).toLong
def merge(b1: Long, b2: Long) = b1 + b2
def zero = 0L
def finish(b: Long) = b
def bufferEncoder = Encoders.scalaLong
def outputEncoder = Encoders.scalaLong
}.toColumn
}
带包装盒class
case class MyTraitTest(testDf: DataFrame) extends MyTrait[SomeKey] {
import testDf.sparkSession.implicits._
import org.apache.spark.sql.functions.struct
override def processInputs = testDf
.select(struct($"a", $"b") as "key", $"v" )
.as[InputRow[SomeKey]]
}
和测试执行
val df = Seq(
("1", false, 0.40),
("1", false, 0.54),
("0", true, 0.85),
("1", true, 0.39)
).toDF("a", "b", "v")
val myTraitTest = MyTraitTest(df)
val ds = myTraitTest.processInputs
val res = myTraitTest.groupAndAggregate(ds)
res.show(false)
+----------+-----------------------------------------------+
|key |$anon($line5460910223.$read$$iw$$iw$InputRow)|
+----------+-----------------------------------------------+
|[1, false]|94 |
|[1, true] |39 |
|[0, true] |85 |
+----------+-----------------------------------------------+
我正在尝试重构一些代码并将通用逻辑放入特征中。我基本上想处理数据集,按某个键对它们进行分组并聚合:
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.{ Dataset, Encoder, Encoders, TypedColumn }
case class SomeKey(a: String, b: Boolean)
case class InputRow(
SomeKey,
v: Double
)
trait MyTrait {
def processInputs: Dataset[InputRow]
def groupAndAggregate(
logs: Dataset[InputRow]
): Dataset[(SomeKey, Long)] = {
import logs.sparkSession.implicits._
logs
.groupByKey(i => i.key)
.agg(someAggFunc)
}
//Whatever agg function: here, it counts the number of v that are >= 0.5
def someAggFunc: TypedColumn[InputRow, Long] =
new Aggregator[
/*input type*/ InputRow,
/* "buffer" type */ Long,
/* output type */ Long
] with Serializable {
def zero = 0L
def reduce(b: Long, a: InputRow) = {
if (a.v >= 0.5)
b + 1
else
b
}
def merge(b1: Long, b2: Long) =
b1 + b2
// map buffer to output type
def finish(b: Long) = b
def bufferEncoder: Encoder[Long] = Encoders.scalaLong
def outputEncoder: Encoder[Long] = Encoders.scalaLong
}.toColumn
}
一切正常:我可以实例化一个继承自 MyTrait 的 class 并覆盖我处理输入的方式:
import spark.implicits._
case class MyTraitTest(testDf: DataFrame) extends MyTrait {
override def processInputs: Dataset[InputRow] = {
val ds = testDf
.select(
$"a",
$"b",
$"v",
)
.rdd
.map(
r =>
InputRow(
SomeKey(r.getAs[String]("a"), r.getAs[Boolean]("b")),
r.getAs[Double]("v")
)
)
.toDS
ds
}
val df: DataFrame = Seq(
("1", false, 0.40),
("1", false, 0.54),
("0", true, 0.85),
("1", true, 0.39)
).toDF("a", "b", "v")
val myTraitTest = MyTraitTest(df)
val ds: Dataset[InputRow] = myTraitTest.processInputs
val res = myTraitTest.groupAndAggregate(ds)
res.show(false)
+----------+----------------------------------+
|key |InputRow |
+----------+----------------------------------+
|[1, false]|1 |
|[0, true] |1 |
|[1, true] |0 |
+----------+----------------------------------+
现在的问题是:我希望 SomeKey 派生自更通用的特征键,因为键不会总是只有两个字段,字段不会具有相同的类型等。它将始终是一个简单的元组一些基本的原始类型。
所以我尝试执行以下操作:
trait Key extends Product
case class SomeKey(a: String, b: Boolean) extends Key
case class SomeOtherKey(x: Int, y: Boolean, z: String) extends Key
case class InputRow[T <: Key](
key: T,
v: Double
)
trait MyTrait[T <: Key] {
def processInputs: Dataset[InputRow[T]]
def groupAndAggregate(
logs: Dataset[InputRow[T]]
): Dataset[(T, Long)] = {
import logs.sparkSession.implicits._
logs
.groupByKey(i => i.key)
.agg(someAggFunc)
}
def someAggFunc: TypedColumn[InputRow[T], Long] = {...}
我现在做:
case class MyTraitTest(testDf: DataFrame) extends MyTrait[SomeKey] {
override def processInputs: Dataset[InputRow[SomeKey]] = {
...
}
等等
但现在我得到错误:Unable to find encoder for type T. An implicit Encoder[T] is needed to store T instances in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases.
.groupByKey(i => i.key)
我真的不知道如何解决这个问题,我尝试了很多方法都没有成功。很抱歉这么长的描述,但希望你有所有的元素来帮助我理解......谢谢!
Spark 需要能够为产品类型 T 隐式创建编码器,因此您需要帮助它绕过 JVM 类型擦除并将 T 的 TypeTag 作为 groupAndAggregate 方法的隐式参数传递。
一个工作示例:
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.{ DataFrame, Dataset, Encoders, TypedColumn }
import scala.reflect.runtime.universe.TypeTag
trait Key extends Product
case class SomeKey(a: String, b: Boolean) extends Key
case class SomeOtherKey(x: Int, y: Boolean, z: String) extends Key
case class InputRow[T <: Key](key: T, v: Double)
trait MyTrait[T <: Key] {
def processInputs: Dataset[InputRow[T]]
def groupAndAggregate(
logs: Dataset[InputRow[T]]
)(implicit tTypeTag: TypeTag[T]): Dataset[(T, Long)] = {
import logs.sparkSession.implicits._
logs
.groupByKey(i => i.key)
.agg(someAggFunc)
}
def someAggFunc: TypedColumn[InputRow[T], Long] =
new Aggregator[InputRow[T], Long, Long] with Serializable {
def reduce(b: Long, a: InputRow[T]) = b + (a.v * 100).toLong
def merge(b1: Long, b2: Long) = b1 + b2
def zero = 0L
def finish(b: Long) = b
def bufferEncoder = Encoders.scalaLong
def outputEncoder = Encoders.scalaLong
}.toColumn
}
带包装盒class
case class MyTraitTest(testDf: DataFrame) extends MyTrait[SomeKey] {
import testDf.sparkSession.implicits._
import org.apache.spark.sql.functions.struct
override def processInputs = testDf
.select(struct($"a", $"b") as "key", $"v" )
.as[InputRow[SomeKey]]
}
和测试执行
val df = Seq(
("1", false, 0.40),
("1", false, 0.54),
("0", true, 0.85),
("1", true, 0.39)
).toDF("a", "b", "v")
val myTraitTest = MyTraitTest(df)
val ds = myTraitTest.processInputs
val res = myTraitTest.groupAndAggregate(ds)
res.show(false)
+----------+-----------------------------------------------+
|key |$anon($line5460910223.$read$$iw$$iw$InputRow)|
+----------+-----------------------------------------------+
|[1, false]|94 |
|[1, true] |39 |
|[0, true] |85 |
+----------+-----------------------------------------------+