如何在 Spark SQL 中为自定义类型定义模式?

How to define schema for custom type in Spark SQL?

以下示例代码尝试将一些案例对象放入数据框中。该代码包括案例对象层次结构的定义和使用此特征的案例 class:

import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.sql.SQLContext

sealed trait Some
case object AType extends Some
case object BType extends Some

case class Data( name : String, t: Some)

object Example {
  def main(args: Array[String]) : Unit = {
    val conf = new SparkConf()
      .setAppName( "Example" )
      .setMaster( "local[*]")

    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)

    import sqlContext.implicits._

    val df = sc.parallelize( Seq( Data( "a", AType), Data( "b", BType) ), 4).toDF()
    df.show()
  }
}    

在执行代码的时候,不幸遇到了如下异常:

java.lang.UnsupportedOperationException: Schema for type Some is not supported

问题

Enumeration 的代码:

object Some extends Enumeration {
  type Some = Value
  val AType, BType = Value
}

提前致谢。我希望最好的方法是不要使用字符串。

Spark 2.0.0+

UserDefinedType 已在 Spark 2.0.0 中设为私有,目前还没有 Dataset 友好的替代品。

参见:SPARK-14155 (Hide UserDefinedType in Spark 2.0)

大多数时候静态类型Dataset可以作为替换 有一个待处理的 Jira SPARK-7768 再次使用目标版本 2.4 制作 UDT API public。

另见

Spark < 2.0.0

Is there a possibility to add or define a schema for certain types (here type Some)?

我想答案取决于您对它的需求程度。看起来可以创建一个 UserDefinedType 但它需要访问 DeveloperApi 并且不是很简单或有据可查。

import org.apache.spark.sql.types._

@SQLUserDefinedType(udt = classOf[SomeUDT])
sealed trait Some
case object AType extends Some
case object BType extends Some

class SomeUDT extends UserDefinedType[Some] {
  override def sqlType: DataType = IntegerType

  override def serialize(obj: Any) = {
    obj match {
      case AType => 0
      case BType => 1
    }
  }

  override def deserialize(datum: Any): Some = {
    datum match {
      case 0 => AType
      case 1 => BType
    }
  }

  override def userClass: Class[Some] = classOf[Some]
}

你应该也覆盖 hashCodeequals

它的 PySpark 对应物可能如下所示:

from enum import Enum, unique
from pyspark.sql.types import UserDefinedType, IntegerType

class SomeUDT(UserDefinedType):
    @classmethod
    def sqlType(self):
        return IntegerType()

    @classmethod
    def module(cls):
        return cls.__module__

    @classmethod 
    def scalaUDT(cls): # Required in Spark < 1.5
        return 'net.zero323.enum.SomeUDT'

    def serialize(self, obj):
        return obj.value

    def deserialize(self, datum):
        return {x.value: x for x in Some}[datum]

@unique
class Some(Enum):
    __UDT__ = SomeUDT()
    AType = 0
    BType = 1

在 Spark < 1.5 中 Python UDT 需要配对的 Scala UDT,但在 1.5 中似乎不再如此。

对于像您这样的简单 UDT,您可以使用简单类型(例如 IntegerType 而不是整个 Struct)。