为什么创建自定义案例的数据集时是"Unable to find encoder for type stored in a Dataset" class?
Why is "Unable to find encoder for type stored in a Dataset" when creating a dataset of custom case class?
带有 Scala 2.11.8 的 Spark 2.0(最终版)。以下超级简单的代码会产生编译错误 Error:(17, 45) Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases.
import org.apache.spark.sql.SparkSession
case class SimpleTuple(id: Int, desc: String)
object DatasetTest {
val dataList = List(
SimpleTuple(5, "abc"),
SimpleTuple(6, "bcd")
)
def main(args: Array[String]): Unit = {
val sparkSession = SparkSession.builder.
master("local")
.appName("example")
.getOrCreate()
val dataset = sparkSession.createDataset(dataList)
}
}
Spark Datasets
需要 Encoders
作为即将存储的数据类型。对于常见类型(原子、产品类型),有许多可用的预定义编码器,但您必须先从 SparkSession.implicits
导入这些编码器才能使其工作:
val sparkSession: SparkSession = ???
import sparkSession.implicits._
val dataset = sparkSession.createDataset(dataList)
或者您可以直接提供明确的
import org.apache.spark.sql.{Encoder, Encoders}
val dataset = sparkSession.createDataset(dataList)(Encoders.product[SimpleTuple])
或隐式
implicit val enc: Encoder[SimpleTuple] = Encoders.product[SimpleTuple]
val dataset = sparkSession.createDataset(dataList)
Encoder
为存储类型。
请注意,Encoders
还提供了一些预定义的 Encoders
用于原子类型,Encoders
用于复杂类型,可以用 ExpressionEncoder
派生。
延伸阅读:
- 对于内置编码器未涵盖的自定义对象,请参阅
- 对于
Row
对象,您必须明确提供 Encoder
,如 所示
- 对于调试用例,用例 class 必须在 Main
之外定义
对于其他用户(你的是正确的),请注意 case class
在 object
范围之外定义也很重要。所以:
失败:
object DatasetTest {
case class SimpleTuple(id: Int, desc: String)
val dataList = List(
SimpleTuple(5, "abc"),
SimpleTuple(6, "bcd")
)
def main(args: Array[String]): Unit = {
val sparkSession = SparkSession.builder
.master("local")
.appName("example")
.getOrCreate()
val dataset = sparkSession.createDataset(dataList)
}
}
添加隐式,仍然失败并出现相同的错误:
object DatasetTest {
case class SimpleTuple(id: Int, desc: String)
val dataList = List(
SimpleTuple(5, "abc"),
SimpleTuple(6, "bcd")
)
def main(args: Array[String]): Unit = {
val sparkSession = SparkSession.builder
.master("local")
.appName("example")
.getOrCreate()
import sparkSession.implicits._
val dataset = sparkSession.createDataset(dataList)
}
}
作品:
case class SimpleTuple(id: Int, desc: String)
object DatasetTest {
val dataList = List(
SimpleTuple(5, "abc"),
SimpleTuple(6, "bcd")
)
def main(args: Array[String]): Unit = {
val sparkSession = SparkSession.builder
.master("local")
.appName("example")
.getOrCreate()
import sparkSession.implicits._
val dataset = sparkSession.createDataset(dataList)
}
}
这是相关的错误:https://issues.apache.org/jira/browse/SPARK-13540,所以希望它会在 Spark 2 的下一个版本中得到修复。
(编辑:看起来这个错误修复实际上在 Spark 2.0.0 中......所以我不确定为什么仍然失败)。
我会用我自己的问题的答案来澄清,如果目标是定义一个简单的文字 SparkData 框架,而不是使用 Scala 元组和隐式转换,更简单的方法是使用 Spark API 直接像这样:
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import scala.collection.JavaConverters._
val simpleSchema = StructType(
StructField("a", StringType) ::
StructField("b", IntegerType) ::
StructField("c", IntegerType) ::
StructField("d", IntegerType) ::
StructField("e", IntegerType) :: Nil)
val data = List(
Row("001", 1, 0, 3, 4),
Row("001", 3, 4, 1, 7),
Row("001", null, 0, 6, 4),
Row("003", 1, 4, 5, 7),
Row("003", 5, 4, null, 2),
Row("003", 4, null, 9, 2),
Row("003", 2, 3, 0, 1)
)
val df = spark.createDataFrame(data.asJava, simpleSchema)
带有 Scala 2.11.8 的 Spark 2.0(最终版)。以下超级简单的代码会产生编译错误 Error:(17, 45) Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases.
import org.apache.spark.sql.SparkSession
case class SimpleTuple(id: Int, desc: String)
object DatasetTest {
val dataList = List(
SimpleTuple(5, "abc"),
SimpleTuple(6, "bcd")
)
def main(args: Array[String]): Unit = {
val sparkSession = SparkSession.builder.
master("local")
.appName("example")
.getOrCreate()
val dataset = sparkSession.createDataset(dataList)
}
}
Spark Datasets
需要 Encoders
作为即将存储的数据类型。对于常见类型(原子、产品类型),有许多可用的预定义编码器,但您必须先从 SparkSession.implicits
导入这些编码器才能使其工作:
val sparkSession: SparkSession = ???
import sparkSession.implicits._
val dataset = sparkSession.createDataset(dataList)
或者您可以直接提供明确的
import org.apache.spark.sql.{Encoder, Encoders}
val dataset = sparkSession.createDataset(dataList)(Encoders.product[SimpleTuple])
或隐式
implicit val enc: Encoder[SimpleTuple] = Encoders.product[SimpleTuple]
val dataset = sparkSession.createDataset(dataList)
Encoder
为存储类型。
请注意,Encoders
还提供了一些预定义的 Encoders
用于原子类型,Encoders
用于复杂类型,可以用 ExpressionEncoder
派生。
延伸阅读:
- 对于内置编码器未涵盖的自定义对象,请参阅
- 对于
Row
对象,您必须明确提供Encoder
,如 所示
- 对于调试用例,用例 class 必须在 Main
对于其他用户(你的是正确的),请注意 case class
在 object
范围之外定义也很重要。所以:
失败:
object DatasetTest {
case class SimpleTuple(id: Int, desc: String)
val dataList = List(
SimpleTuple(5, "abc"),
SimpleTuple(6, "bcd")
)
def main(args: Array[String]): Unit = {
val sparkSession = SparkSession.builder
.master("local")
.appName("example")
.getOrCreate()
val dataset = sparkSession.createDataset(dataList)
}
}
添加隐式,仍然失败并出现相同的错误:
object DatasetTest {
case class SimpleTuple(id: Int, desc: String)
val dataList = List(
SimpleTuple(5, "abc"),
SimpleTuple(6, "bcd")
)
def main(args: Array[String]): Unit = {
val sparkSession = SparkSession.builder
.master("local")
.appName("example")
.getOrCreate()
import sparkSession.implicits._
val dataset = sparkSession.createDataset(dataList)
}
}
作品:
case class SimpleTuple(id: Int, desc: String)
object DatasetTest {
val dataList = List(
SimpleTuple(5, "abc"),
SimpleTuple(6, "bcd")
)
def main(args: Array[String]): Unit = {
val sparkSession = SparkSession.builder
.master("local")
.appName("example")
.getOrCreate()
import sparkSession.implicits._
val dataset = sparkSession.createDataset(dataList)
}
}
这是相关的错误:https://issues.apache.org/jira/browse/SPARK-13540,所以希望它会在 Spark 2 的下一个版本中得到修复。
(编辑:看起来这个错误修复实际上在 Spark 2.0.0 中......所以我不确定为什么仍然失败)。
我会用我自己的问题的答案来澄清,如果目标是定义一个简单的文字 SparkData 框架,而不是使用 Scala 元组和隐式转换,更简单的方法是使用 Spark API 直接像这样:
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import scala.collection.JavaConverters._
val simpleSchema = StructType(
StructField("a", StringType) ::
StructField("b", IntegerType) ::
StructField("c", IntegerType) ::
StructField("d", IntegerType) ::
StructField("e", IntegerType) :: Nil)
val data = List(
Row("001", 1, 0, 3, 4),
Row("001", 3, 4, 1, 7),
Row("001", null, 0, 6, 4),
Row("003", 1, 4, 5, 7),
Row("003", 5, 4, null, 2),
Row("003", 4, null, 9, 2),
Row("003", 2, 3, 0, 1)
)
val df = spark.createDataFrame(data.asJava, simpleSchema)