如何在 spark-avro 2.4 模式中设置 logicalType?
How can I set a logicalType in a spark-avro 2.4 schema?
我们在应用程序中从 avro 文件中读取时间戳信息。我正在测试从 Spark 2.3.1 到 Spark 2.4 的升级,其中包括新内置的 spark-avro 集成。但是,我无法弄清楚如何告诉 avro 模式我希望时间戳的逻辑类型为 "timestamp-millis" 而不是默认的 "timestamp-micros".
仅使用 Databricks spark-avro 4.0.0 包查看 Spark 2.3.1 下的测试 avro 文件,我们得到以下 fields/schema:
{"name":"id","type":["string","null"]},
{"name":"searchQuery","type":["string","null"]},
{"name":"searchTime","type":["long","null"]},
{"name":"score","type":"double"},
{"name":"searchType","type":["string","null"]}
其中的 searchTime 自纪元存储为 long 以来经过了毫秒数。一切都很好。
当我升级到 Spark 2.4 和内置的 spark-avro 2.4.0 包时,我有这些更新的 fields/schema:
{"name":"id","type":["string","null"]},
{"name":"searchQuery","type":["string","null"]},
{"name":"searchTime","type":[{"type":"long","logicalType":"timestamp-micros"},"null"]},
{"name":"score","type":"double"},
{"name":"searchType","type":["string","null"]}
如您所见,基础类型仍然是 long,但现在增加了 "timestamp-micros" 的 logicalType。这与 the release notes 所说的完全一样,但是,我找不到指定模式以使用 'timestamp-millis' 选项的方法。
这就成了一个问题,当我向 avro 文件写入一个时间戳对象时,该对象初始化为纪元后 10,000 秒,它会被读回为 10,000,000 秒。在2.3.1/databricks-avro下,只是一个long,没有关联信息,所以进去就出来了。
我们目前通过反映感兴趣的对象来构建一个模式,如下所示:
val searchSchema: StructType = ScalaReflection.schemaFor[searchEntry].dataType.asInstanceOf[StructType]
我尝试通过创建一个修改过的模式来增强这一点,该模式试图替换与 searchTime 条目对应的 StructField,如下所示:
val modSearchSchema = StructType(searchSchema.fields.map {
case StructField(name, _, nullable, metadata) if name == "searchTime" =>
StructField(name, org.apache.spark.sql.types.DataTypes.TimestampType, nullable, metadata)
case f => f
})
但是,spark.sql.types 中定义的 StructField 对象没有可以扩充其中数据类型的逻辑类型的概念。
case class StructField(
name: String,
dataType: DataType,
nullable: Boolean = true,
metadata: Metadata = Metadata.empty)
我还尝试通过两种方式从 JSON 表示创建模式:
val schemaJSONrepr = """{
| "name" : "id",
| "type" : "string",
| "nullable" : true,
| "metadata" : { }
| }, {
| "name" : "searchQuery",
| "type" : "string",
| "nullable" : true,
| "metadata" : { }
| }, {
| "name" : "searchTime",
| "type" : "long",
| "logicalType" : "timestamp-millis",
| "nullable" : false,
| "metadata" : { }
| }, {
| "name" : "score",
| "type" : "double",
| "nullable" : false,
| "metadata" : { }
| }, {
| "name" : "searchType",
| "type" : "string",
| "nullable" : true,
| "metadata" : { }
| }""".stripMargin
第一次尝试只是从中创建一个数据类型
// here spark is a SparkSession instance from a higher scope.
val schema = DataType.fromJSON(schemaJSONrepr).asInstanceOf[StructType]
spark.read
.schema(schema)
.format("avro")
.option("basePath", baseUri)
.load(uris: _*)
失败是因为它无法为 searchTime 节点创建 StructType,因为其中包含 "logicalType"。第二次尝试是通过传入原始 JSON 字符串来简单地创建模式。
spark.read
.schema(schemaJSONrepr)
.format("avro")
.option("basePath", baseUri)
.load(uris: _*)
这没有说明:
mismatched input '{' expecting {'SELECT', 'FROM', ...
== SQL ==
{
^^^
我发现 spark-avro API 中有一种方法可以从模式中获取逻辑类型,但不知道如何设置。
正如您在上面看到我失败的尝试,我尝试使用 Schema.Parser 创建 avro 模式对象,但 spark.read.schema 唯一接受的类型是 String 和 StructType。
如果有人可以提供有关如何 change/specify 这个 logicalType 的见解,我将非常感激。谢谢
好的,我想我回答了我自己的问题。当我修改以编程方式构建的架构以使用显式时间戳类型时
val modSearchSchema = StructType(searchSchema.fields.map {
case StructField(name, _, nullable, metadata) if name == "searchTime" =>
StructField(name, org.apache.spark.sql.types.DataTypes.TimestampType, nullable, metadata)
case f => f
})
当我们读出一个 Row 对象时,我没有改变逻辑。最初我们会读取一个 Long 并将其转换为一个时间戳,这是出了问题的地方,因为它正在读回 Long 为微秒,这会使它比我们预期的大 1,000 倍。将我们的读取更改为直接读取 Timestamp 对象,让底层逻辑解释这一点,将其从我们(我)手中夺走。所以:
// searchTime = new Timestamp(row.getAs[Long]("searchTime")) BROKEN
searchTime = row.getAs[Timestamp]("searchTime") // SUCCESS
我们在应用程序中从 avro 文件中读取时间戳信息。我正在测试从 Spark 2.3.1 到 Spark 2.4 的升级,其中包括新内置的 spark-avro 集成。但是,我无法弄清楚如何告诉 avro 模式我希望时间戳的逻辑类型为 "timestamp-millis" 而不是默认的 "timestamp-micros".
仅使用 Databricks spark-avro 4.0.0 包查看 Spark 2.3.1 下的测试 avro 文件,我们得到以下 fields/schema:
{"name":"id","type":["string","null"]},
{"name":"searchQuery","type":["string","null"]},
{"name":"searchTime","type":["long","null"]},
{"name":"score","type":"double"},
{"name":"searchType","type":["string","null"]}
其中的 searchTime 自纪元存储为 long 以来经过了毫秒数。一切都很好。
当我升级到 Spark 2.4 和内置的 spark-avro 2.4.0 包时,我有这些更新的 fields/schema:
{"name":"id","type":["string","null"]},
{"name":"searchQuery","type":["string","null"]},
{"name":"searchTime","type":[{"type":"long","logicalType":"timestamp-micros"},"null"]},
{"name":"score","type":"double"},
{"name":"searchType","type":["string","null"]}
如您所见,基础类型仍然是 long,但现在增加了 "timestamp-micros" 的 logicalType。这与 the release notes 所说的完全一样,但是,我找不到指定模式以使用 'timestamp-millis' 选项的方法。
这就成了一个问题,当我向 avro 文件写入一个时间戳对象时,该对象初始化为纪元后 10,000 秒,它会被读回为 10,000,000 秒。在2.3.1/databricks-avro下,只是一个long,没有关联信息,所以进去就出来了。
我们目前通过反映感兴趣的对象来构建一个模式,如下所示:
val searchSchema: StructType = ScalaReflection.schemaFor[searchEntry].dataType.asInstanceOf[StructType]
我尝试通过创建一个修改过的模式来增强这一点,该模式试图替换与 searchTime 条目对应的 StructField,如下所示:
val modSearchSchema = StructType(searchSchema.fields.map {
case StructField(name, _, nullable, metadata) if name == "searchTime" =>
StructField(name, org.apache.spark.sql.types.DataTypes.TimestampType, nullable, metadata)
case f => f
})
但是,spark.sql.types 中定义的 StructField 对象没有可以扩充其中数据类型的逻辑类型的概念。
case class StructField(
name: String,
dataType: DataType,
nullable: Boolean = true,
metadata: Metadata = Metadata.empty)
我还尝试通过两种方式从 JSON 表示创建模式:
val schemaJSONrepr = """{
| "name" : "id",
| "type" : "string",
| "nullable" : true,
| "metadata" : { }
| }, {
| "name" : "searchQuery",
| "type" : "string",
| "nullable" : true,
| "metadata" : { }
| }, {
| "name" : "searchTime",
| "type" : "long",
| "logicalType" : "timestamp-millis",
| "nullable" : false,
| "metadata" : { }
| }, {
| "name" : "score",
| "type" : "double",
| "nullable" : false,
| "metadata" : { }
| }, {
| "name" : "searchType",
| "type" : "string",
| "nullable" : true,
| "metadata" : { }
| }""".stripMargin
第一次尝试只是从中创建一个数据类型
// here spark is a SparkSession instance from a higher scope.
val schema = DataType.fromJSON(schemaJSONrepr).asInstanceOf[StructType]
spark.read
.schema(schema)
.format("avro")
.option("basePath", baseUri)
.load(uris: _*)
失败是因为它无法为 searchTime 节点创建 StructType,因为其中包含 "logicalType"。第二次尝试是通过传入原始 JSON 字符串来简单地创建模式。
spark.read
.schema(schemaJSONrepr)
.format("avro")
.option("basePath", baseUri)
.load(uris: _*)
这没有说明:
mismatched input '{' expecting {'SELECT', 'FROM', ...
== SQL ==
{
^^^
我发现 spark-avro API 中有一种方法可以从模式中获取逻辑类型,但不知道如何设置。
正如您在上面看到我失败的尝试,我尝试使用 Schema.Parser 创建 avro 模式对象,但 spark.read.schema 唯一接受的类型是 String 和 StructType。
如果有人可以提供有关如何 change/specify 这个 logicalType 的见解,我将非常感激。谢谢
好的,我想我回答了我自己的问题。当我修改以编程方式构建的架构以使用显式时间戳类型时
val modSearchSchema = StructType(searchSchema.fields.map {
case StructField(name, _, nullable, metadata) if name == "searchTime" =>
StructField(name, org.apache.spark.sql.types.DataTypes.TimestampType, nullable, metadata)
case f => f
})
当我们读出一个 Row 对象时,我没有改变逻辑。最初我们会读取一个 Long 并将其转换为一个时间戳,这是出了问题的地方,因为它正在读回 Long 为微秒,这会使它比我们预期的大 1,000 倍。将我们的读取更改为直接读取 Timestamp 对象,让底层逻辑解释这一点,将其从我们(我)手中夺走。所以:
// searchTime = new Timestamp(row.getAs[Long]("searchTime")) BROKEN
searchTime = row.getAs[Timestamp]("searchTime") // SUCCESS