使用类型化 UDAF 推断为二进制的列类型
Column type inferred as binary with typed UDAF
我正在尝试实现 returns 复杂类型的类型化 UDAF。 Spark 无法以某种方式推断出结果列的类型,并使其 binary
将序列化数据放在那里。这是重现问题的最小示例
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.{SparkSession, Encoder, Encoders}
case class Data(key: Int)
class NoopAgg[I] extends Aggregator[I, Map[String, Int], Map[String, Int]] {
override def zero: Map[String, Int] = Map.empty[String, Int]
override def reduce(b: Map[String, Int], a: I): Map[String, Int] = b
override def merge(b1: Map[String, Int], b2: Map[String, Int]): Map[String, Int] = b1
override def finish(reduction: Map[String, Int]): Map[String, Int] = reduction
override def bufferEncoder: Encoder[Map[String, Int]] = Encoders.kryo[Map[String, Int]]
override def outputEncoder: Encoder[Map[String, Int]] = Encoders.kryo[Map[String, Int]]
}
object Question {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().master("local").getOrCreate()
val sc = spark.sparkContext
import spark.implicits._
val ds = sc.parallelize((1 to 10).map(i => Data(i))).toDS()
val noop = new NoopAgg[Data]().toColumn
val result = ds.groupByKey(_.key).agg(noop.as("my_sum").as[Map[String, Int]])
result.printSchema()
}
}
它打印
root
|-- value: integer (nullable = false)
|-- my_sum: binary (nullable = true)
这里根本没有推论。相反,您或多或少会得到您所要求的。具体错误在这里:
override def outputEncoder: Encoder[Map[String, Int]] = Encoders.kryo[Map[String, Int]]
Encoders.kryo
表示您应用通用序列化和 return 二进制 blob。误导性的部分是 .as[Map[String, Int]]
- 与人们可能期望的相反,它没有进行静态类型检查。更糟糕的是,它甚至没有被查询规划器主动验证,并且只有在评估 result
时才会抛出运行时异常。
result.first
org.apache.spark.sql.AnalysisException: cannot resolve '`my_sum`' due to data type mismatch: cannot cast binary to map<string,int>;
at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$$anonfun$apply.applyOrElse(CheckAnalysis.scala:115)
...
您应该提供具体的 Encoder
,:
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
def outputEncoder: Encoder[Map[String, Int]] = ExpressionEncoder()
或隐式
class NoopAgg[I](implicit val enc: Encoder[Map[String, Int]]) extends Aggregator[I, Map[String, Int], Map[String, Int]] {
...
override def outputEncoder: Encoder[Map[String, Int]] = enc
}
作为副作用,它会使 as[Map[String, Int]]
过时,因为 Aggregator
的 return 类型已经为人所知。
我正在尝试实现 returns 复杂类型的类型化 UDAF。 Spark 无法以某种方式推断出结果列的类型,并使其 binary
将序列化数据放在那里。这是重现问题的最小示例
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.{SparkSession, Encoder, Encoders}
case class Data(key: Int)
class NoopAgg[I] extends Aggregator[I, Map[String, Int], Map[String, Int]] {
override def zero: Map[String, Int] = Map.empty[String, Int]
override def reduce(b: Map[String, Int], a: I): Map[String, Int] = b
override def merge(b1: Map[String, Int], b2: Map[String, Int]): Map[String, Int] = b1
override def finish(reduction: Map[String, Int]): Map[String, Int] = reduction
override def bufferEncoder: Encoder[Map[String, Int]] = Encoders.kryo[Map[String, Int]]
override def outputEncoder: Encoder[Map[String, Int]] = Encoders.kryo[Map[String, Int]]
}
object Question {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().master("local").getOrCreate()
val sc = spark.sparkContext
import spark.implicits._
val ds = sc.parallelize((1 to 10).map(i => Data(i))).toDS()
val noop = new NoopAgg[Data]().toColumn
val result = ds.groupByKey(_.key).agg(noop.as("my_sum").as[Map[String, Int]])
result.printSchema()
}
}
它打印
root
|-- value: integer (nullable = false)
|-- my_sum: binary (nullable = true)
这里根本没有推论。相反,您或多或少会得到您所要求的。具体错误在这里:
override def outputEncoder: Encoder[Map[String, Int]] = Encoders.kryo[Map[String, Int]]
Encoders.kryo
表示您应用通用序列化和 return 二进制 blob。误导性的部分是 .as[Map[String, Int]]
- 与人们可能期望的相反,它没有进行静态类型检查。更糟糕的是,它甚至没有被查询规划器主动验证,并且只有在评估 result
时才会抛出运行时异常。
result.first
org.apache.spark.sql.AnalysisException: cannot resolve '`my_sum`' due to data type mismatch: cannot cast binary to map<string,int>;
at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$$anonfun$apply.applyOrElse(CheckAnalysis.scala:115)
...
您应该提供具体的 Encoder
,
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
def outputEncoder: Encoder[Map[String, Int]] = ExpressionEncoder()
或隐式
class NoopAgg[I](implicit val enc: Encoder[Map[String, Int]]) extends Aggregator[I, Map[String, Int], Map[String, Int]] {
...
override def outputEncoder: Encoder[Map[String, Int]] = enc
}
作为副作用,它会使 as[Map[String, Int]]
过时,因为 Aggregator
的 return 类型已经为人所知。