如何在 Spark SQL 中为自定义类型定义模式?
How to define schema for custom type in Spark SQL?
以下示例代码尝试将一些案例对象放入数据框中。该代码包括案例对象层次结构的定义和使用此特征的案例 class:
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.sql.SQLContext
sealed trait Some
case object AType extends Some
case object BType extends Some
case class Data( name : String, t: Some)
object Example {
def main(args: Array[String]) : Unit = {
val conf = new SparkConf()
.setAppName( "Example" )
.setMaster( "local[*]")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
val df = sc.parallelize( Seq( Data( "a", AType), Data( "b", BType) ), 4).toDF()
df.show()
}
}
在执行代码的时候,不幸遇到了如下异常:
java.lang.UnsupportedOperationException: Schema for type Some is not supported
问题
- 是否可以为某些类型添加或定义模式(此处输入
Some
)?
- 是否存在另一种方法来表示这种枚举?
- 我试过直接用
Enumeration
,也没有成功。 (见下文)
Enumeration
的代码:
object Some extends Enumeration {
type Some = Value
val AType, BType = Value
}
提前致谢。我希望最好的方法是不要使用字符串。
Spark 2.0.0+:
UserDefinedType
已在 Spark 2.0.0 中设为私有,目前还没有 Dataset
友好的替代品。
参见:SPARK-14155 (Hide UserDefinedType in Spark 2.0)
大多数时候静态类型Dataset
可以作为替换
有一个待处理的 Jira SPARK-7768 再次使用目标版本 2.4 制作 UDT API public。
另见
Spark < 2.0.0
Is there a possibility to add or define a schema for certain types (here type Some)?
我想答案取决于您对它的需求程度。看起来可以创建一个 UserDefinedType
但它需要访问 DeveloperApi
并且不是很简单或有据可查。
import org.apache.spark.sql.types._
@SQLUserDefinedType(udt = classOf[SomeUDT])
sealed trait Some
case object AType extends Some
case object BType extends Some
class SomeUDT extends UserDefinedType[Some] {
override def sqlType: DataType = IntegerType
override def serialize(obj: Any) = {
obj match {
case AType => 0
case BType => 1
}
}
override def deserialize(datum: Any): Some = {
datum match {
case 0 => AType
case 1 => BType
}
}
override def userClass: Class[Some] = classOf[Some]
}
你应该也覆盖 hashCode
和 equals
。
它的 PySpark 对应物可能如下所示:
from enum import Enum, unique
from pyspark.sql.types import UserDefinedType, IntegerType
class SomeUDT(UserDefinedType):
@classmethod
def sqlType(self):
return IntegerType()
@classmethod
def module(cls):
return cls.__module__
@classmethod
def scalaUDT(cls): # Required in Spark < 1.5
return 'net.zero323.enum.SomeUDT'
def serialize(self, obj):
return obj.value
def deserialize(self, datum):
return {x.value: x for x in Some}[datum]
@unique
class Some(Enum):
__UDT__ = SomeUDT()
AType = 0
BType = 1
在 Spark < 1.5 中 Python UDT 需要配对的 Scala UDT,但在 1.5 中似乎不再如此。
对于像您这样的简单 UDT,您可以使用简单类型(例如 IntegerType
而不是整个 Struct
)。
以下示例代码尝试将一些案例对象放入数据框中。该代码包括案例对象层次结构的定义和使用此特征的案例 class:
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.sql.SQLContext
sealed trait Some
case object AType extends Some
case object BType extends Some
case class Data( name : String, t: Some)
object Example {
def main(args: Array[String]) : Unit = {
val conf = new SparkConf()
.setAppName( "Example" )
.setMaster( "local[*]")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
val df = sc.parallelize( Seq( Data( "a", AType), Data( "b", BType) ), 4).toDF()
df.show()
}
}
在执行代码的时候,不幸遇到了如下异常:
java.lang.UnsupportedOperationException: Schema for type Some is not supported
问题
- 是否可以为某些类型添加或定义模式(此处输入
Some
)? - 是否存在另一种方法来表示这种枚举?
- 我试过直接用
Enumeration
,也没有成功。 (见下文)
- 我试过直接用
Enumeration
的代码:
object Some extends Enumeration {
type Some = Value
val AType, BType = Value
}
提前致谢。我希望最好的方法是不要使用字符串。
Spark 2.0.0+:
UserDefinedType
已在 Spark 2.0.0 中设为私有,目前还没有 Dataset
友好的替代品。
参见:SPARK-14155 (Hide UserDefinedType in Spark 2.0)
大多数时候静态类型Dataset
可以作为替换
有一个待处理的 Jira SPARK-7768 再次使用目标版本 2.4 制作 UDT API public。
另见
Spark < 2.0.0
Is there a possibility to add or define a schema for certain types (here type Some)?
我想答案取决于您对它的需求程度。看起来可以创建一个 UserDefinedType
但它需要访问 DeveloperApi
并且不是很简单或有据可查。
import org.apache.spark.sql.types._
@SQLUserDefinedType(udt = classOf[SomeUDT])
sealed trait Some
case object AType extends Some
case object BType extends Some
class SomeUDT extends UserDefinedType[Some] {
override def sqlType: DataType = IntegerType
override def serialize(obj: Any) = {
obj match {
case AType => 0
case BType => 1
}
}
override def deserialize(datum: Any): Some = {
datum match {
case 0 => AType
case 1 => BType
}
}
override def userClass: Class[Some] = classOf[Some]
}
你应该也覆盖 hashCode
和 equals
。
它的 PySpark 对应物可能如下所示:
from enum import Enum, unique
from pyspark.sql.types import UserDefinedType, IntegerType
class SomeUDT(UserDefinedType):
@classmethod
def sqlType(self):
return IntegerType()
@classmethod
def module(cls):
return cls.__module__
@classmethod
def scalaUDT(cls): # Required in Spark < 1.5
return 'net.zero323.enum.SomeUDT'
def serialize(self, obj):
return obj.value
def deserialize(self, datum):
return {x.value: x for x in Some}[datum]
@unique
class Some(Enum):
__UDT__ = SomeUDT()
AType = 0
BType = 1
在 Spark < 1.5 中 Python UDT 需要配对的 Scala UDT,但在 1.5 中似乎不再如此。
对于像您这样的简单 UDT,您可以使用简单类型(例如 IntegerType
而不是整个 Struct
)。