Spark SQL 的 Scala API - TimestampType - 未找到 org.apache.spark.sql.types.TimestampType 的编码器
Spark SQL's Scala API - TimestampType - No Encoder found for org.apache.spark.sql.types.TimestampType
我在 Databricks notebook 上使用 Spark 2.1 和 Scala 2.11
TimestampType 到底是什么?
我们从 SparkSQL's documentation 得知官方时间戳类型是 TimestampType,这显然是 java.sql.Timestamp :
的别名
TimestampType可以在SparkSQL的Scala中找到API
我们在使用模式和数据集时存在差异 API
解析时{"time":1469501297,"action":"Open"}
from the Databricks' Scala Structured Streaming example
使用 Json 模式 --> OK(我更喜欢使用优雅的数据集 API):
val jsonSchema = new StructType().add("time", TimestampType).add("action", StringType)
val staticInputDF =
spark
.read
.schema(jsonSchema)
.json(inputPath)
使用数据集API --> KO:未找到 TimestampType
的编码器
创建事件class
import org.apache.spark.sql.types._
case class Event(action: String, time: TimestampType)
--> defined class Event
从数据块上的 DBFS 读取事件时出错。
注意:当使用 java.sql.Timestamp
作为 "time"
的类型时,我们不会收到错误
val path = "/databricks-datasets/structured-streaming/events/"
val events = spark.read.json(path).as[Event]
错误信息
java.lang.UnsupportedOperationException: No Encoder found for org.apache.spark.sql.types.TimestampType
- field (class: "org.apache.spark.sql.types.TimestampType", name: "time")
- root class:
TimestampType
不是 java.sql.Timestamp
的别名,而是 Spark 内部使用的时间戳类型的表示。通常,您不想在代码中使用 TimestampType
。这个想法是 java.sql.Timestamp
由 Spark SQL 本机支持,因此您可以按如下方式定义事件 class:
case class Event(action: String, time: java.sql.Timestamp)
在内部,Spark 将在编译和优化查询时使用 TimestampType
在运行时对值的类型建模,但这不是您大多数时候感兴趣的东西。
结合模式读取方法 .schema(jsonSchema)
和包含类型 java.sql.Timestamp
的 as[Type]
方法将解决这个问题。这个想法是在阅读 Structured Streaming 文档 Creating streaming DataFrames and streaming Datasets
之后产生的
These examples generate streaming DataFrames that are untyped, meaning
that the schema of the DataFrame is not checked at compile time, only
checked at runtime when the query is submitted. Some operations like
map, flatMap, etc. need the type to be known at compile time. To do
those, you can convert these untyped streaming DataFrames to typed
streaming Datasets using the same methods as static DataFrame.
val path = "/databricks-datasets/structured-streaming/events/"
val jsonSchema = new StructType().add("time", TimestampType).add("action", StringType)
case class Event(action: String, time: java.sql.Timestamp)
val staticInputDS =
spark
.read
.schema(jsonSchema)
.json(path)
.as[Event]
staticInputDF.printSchema
将输出:
root
|-- time: timestamp (nullable = true)
|-- action: string (nullable = true)
我在 Databricks notebook 上使用 Spark 2.1 和 Scala 2.11
TimestampType 到底是什么?
我们从 SparkSQL's documentation 得知官方时间戳类型是 TimestampType,这显然是 java.sql.Timestamp :
的别名TimestampType可以在SparkSQL的Scala中找到API
我们在使用模式和数据集时存在差异 API
解析时{"time":1469501297,"action":"Open"}
from the Databricks' Scala Structured Streaming example
使用 Json 模式 --> OK(我更喜欢使用优雅的数据集 API):
val jsonSchema = new StructType().add("time", TimestampType).add("action", StringType)
val staticInputDF =
spark
.read
.schema(jsonSchema)
.json(inputPath)
使用数据集API --> KO:未找到 TimestampType
的编码器创建事件class
import org.apache.spark.sql.types._
case class Event(action: String, time: TimestampType)
--> defined class Event
从数据块上的 DBFS 读取事件时出错。
注意:当使用 java.sql.Timestamp
作为 "time"
val path = "/databricks-datasets/structured-streaming/events/"
val events = spark.read.json(path).as[Event]
错误信息
java.lang.UnsupportedOperationException: No Encoder found for org.apache.spark.sql.types.TimestampType
- field (class: "org.apache.spark.sql.types.TimestampType", name: "time")
- root class:
TimestampType
不是 java.sql.Timestamp
的别名,而是 Spark 内部使用的时间戳类型的表示。通常,您不想在代码中使用 TimestampType
。这个想法是 java.sql.Timestamp
由 Spark SQL 本机支持,因此您可以按如下方式定义事件 class:
case class Event(action: String, time: java.sql.Timestamp)
在内部,Spark 将在编译和优化查询时使用 TimestampType
在运行时对值的类型建模,但这不是您大多数时候感兴趣的东西。
结合模式读取方法 .schema(jsonSchema)
和包含类型 java.sql.Timestamp
的 as[Type]
方法将解决这个问题。这个想法是在阅读 Structured Streaming 文档 Creating streaming DataFrames and streaming Datasets
These examples generate streaming DataFrames that are untyped, meaning that the schema of the DataFrame is not checked at compile time, only checked at runtime when the query is submitted. Some operations like map, flatMap, etc. need the type to be known at compile time. To do those, you can convert these untyped streaming DataFrames to typed streaming Datasets using the same methods as static DataFrame.
val path = "/databricks-datasets/structured-streaming/events/"
val jsonSchema = new StructType().add("time", TimestampType).add("action", StringType)
case class Event(action: String, time: java.sql.Timestamp)
val staticInputDS =
spark
.read
.schema(jsonSchema)
.json(path)
.as[Event]
staticInputDF.printSchema
将输出:
root
|-- time: timestamp (nullable = true)
|-- action: string (nullable = true)