Spark SQL 的 Scala API - TimestampType - 未找到 org.apache.spark.sql.types.TimestampType 的编码器

Spark SQL's Scala API - TimestampType - No Encoder found for org.apache.spark.sql.types.TimestampType

我在 Databricks notebook 上使用 Spark 2.1 和 Scala 2.11

TimestampType 到底是什么?

我们从 SparkSQL's documentation 得知官方时间戳类型是 TimestampType,这显然是 java.sql.Timestamp :

的别名

TimestampType可以在SparkSQL的Scala中找到API

我们在使用模式和数据集时存在差异 API

解析时{"time":1469501297,"action":"Open"}from the Databricks' Scala Structured Streaming example

使用 Json 模式 --> OK(我更喜欢使用优雅的数据集 API):

val jsonSchema = new StructType().add("time", TimestampType).add("action", StringType)

val staticInputDF = 
  spark
    .read
    .schema(jsonSchema)
    .json(inputPath)

使用数据集API --> KO:未找到 TimestampType

的编码器

创建事件class

import org.apache.spark.sql.types._
case class Event(action: String, time: TimestampType)
--> defined class Event

从数据块上的 DBFS 读取事件时出错。

注意:当使用 java.sql.Timestamp 作为 "time"

的类型时,我们不会收到错误
val path = "/databricks-datasets/structured-streaming/events/"
val events = spark.read.json(path).as[Event]

错误信息

java.lang.UnsupportedOperationException: No Encoder found for org.apache.spark.sql.types.TimestampType
- field (class: "org.apache.spark.sql.types.TimestampType", name: "time")
- root class: 

TimestampType 不是 java.sql.Timestamp 的别名,而是 Spark 内部使用的时间戳类型的表示。通常,您不想在代码中使用 TimestampType。这个想法是 java.sql.Timestamp 由 Spark SQL 本机支持,因此您可以按如下方式定义事件 class:

case class Event(action: String, time: java.sql.Timestamp)

在内部,Spark 将在编译和优化查询时使用 TimestampType 在运行时对值的类型建模,但这不是您大多数时候感兴趣的东西。

结合模式读取方法 .schema(jsonSchema) 和包含类型 java.sql.Timestampas[Type] 方法将解决这个问题。这个想法是在阅读 Structured Streaming 文档 Creating streaming DataFrames and streaming Datasets

之后产生的

These examples generate streaming DataFrames that are untyped, meaning that the schema of the DataFrame is not checked at compile time, only checked at runtime when the query is submitted. Some operations like map, flatMap, etc. need the type to be known at compile time. To do those, you can convert these untyped streaming DataFrames to typed streaming Datasets using the same methods as static DataFrame.

val path = "/databricks-datasets/structured-streaming/events/"

val jsonSchema = new StructType().add("time", TimestampType).add("action", StringType)

case class Event(action: String, time: java.sql.Timestamp)

val staticInputDS = 
  spark
    .read
    .schema(jsonSchema)
    .json(path)
    .as[Event]

staticInputDF.printSchema

将输出:

root
 |-- time: timestamp (nullable = true)
 |-- action: string (nullable = true)