如何在 spark-avro 2.4 模式中设置 logicalType?

How can I set a logicalType in a spark-avro 2.4 schema?

我们在应用程序中从 avro 文件中读取时间戳信息。我正在测试从 Spark 2.3.1 到 Spark 2.4 的升级,其中包括新内置的 spark-avro 集成。但是,我无法弄清楚如何告诉 avro 模式我希望时间戳的逻辑类型为 "timestamp-millis" 而不是默认的 "timestamp-micros".

仅使用 Databricks spark-avro 4.0.0 包查看 Spark 2.3.1 下的测试 avro 文件,我们得到以下 fields/schema:

{"name":"id","type":["string","null"]},
{"name":"searchQuery","type":["string","null"]},
{"name":"searchTime","type":["long","null"]},
{"name":"score","type":"double"},
{"name":"searchType","type":["string","null"]}

其中的 searchTime 自纪元存储为 long 以来经过了毫秒数。一切都很好。

当我升级到 Spark 2.4 和内置的 spark-avro 2.4.0 包时,我有这些更新的 fields/schema:

{"name":"id","type":["string","null"]},
{"name":"searchQuery","type":["string","null"]},
{"name":"searchTime","type":[{"type":"long","logicalType":"timestamp-micros"},"null"]},
{"name":"score","type":"double"},
{"name":"searchType","type":["string","null"]}

如您所见,基础类型仍然是 long,但现在增加了 "timestamp-micros" 的 logicalType。这与 the release notes 所说的完全一样,但是,我找不到指定模式以使用 'timestamp-millis' 选项的方法。

这就成了一个问题,当我向 avro 文件写入一个时间戳对象时,该对象初始化为纪元后 10,000 秒,它会被读回为 10,000,000 秒。在2.3.1/databricks-avro下,只是一个long,没有关联信息,所以进去就出来了。

我们目前通过反映感兴趣的对象来构建一个模式,如下所示:

val searchSchema: StructType = ScalaReflection.schemaFor[searchEntry].dataType.asInstanceOf[StructType]

我尝试通过创建一个修改过的模式来增强这一点,该模式试图替换与 searchTime 条目对应的 StructField,如下所示:

    val modSearchSchema = StructType(searchSchema.fields.map {
      case StructField(name, _, nullable, metadata) if name == "searchTime" =>
        StructField(name, org.apache.spark.sql.types.DataTypes.TimestampType, nullable, metadata)
      case f => f
    })

但是,spark.sql.types 中定义的 StructField 对象没有可以扩充其中数据类型的逻辑类型的概念。

case class StructField(
    name: String,
    dataType: DataType,
    nullable: Boolean = true,
    metadata: Metadata = Metadata.empty) 

我还尝试通过两种方式从 JSON 表示创建模式:

val schemaJSONrepr = """{
          |          "name" : "id",
          |          "type" : "string",
          |          "nullable" : true,
          |          "metadata" : { }
          |        }, {
          |          "name" : "searchQuery",
          |          "type" : "string",
          |          "nullable" : true,
          |          "metadata" : { }
          |        }, {
          |          "name" : "searchTime",
          |          "type" : "long",
          |          "logicalType" : "timestamp-millis",
          |          "nullable" : false,
          |          "metadata" : { }
          |        }, {
          |          "name" : "score",
          |          "type" : "double",
          |          "nullable" : false,
          |          "metadata" : { }
          |        }, {
          |          "name" : "searchType",
          |          "type" : "string",
          |          "nullable" : true,
          |          "metadata" : { }
          |        }""".stripMargin

第一次尝试只是从中创建一个数据类型

// here spark is a SparkSession instance from a higher scope.
val schema = DataType.fromJSON(schemaJSONrepr).asInstanceOf[StructType]
spark.read
     .schema(schema)
     .format("avro")
     .option("basePath", baseUri)
     .load(uris: _*)

失败是因为它无法为 searchTime 节点创建 StructType,因为其中包含 "logicalType"。第二次尝试是通过传入原始 JSON 字符串来简单地创建模式。

spark.read
     .schema(schemaJSONrepr)
     .format("avro")
     .option("basePath", baseUri)
     .load(uris: _*)

这没有说明:

mismatched input '{' expecting {'SELECT', 'FROM', ...

== SQL ==

{
^^^

我发现 spark-avro API 中有一种方法可以从模式中获取逻辑类型,但不知道如何设置。

正如您在上面看到我失败的尝试,我尝试使用 Schema.Parser 创建 avro 模式对象,但 spark.read.schema 唯一接受的类型是 String 和 StructType。

如果有人可以提供有关如何 change/specify 这个 logicalType 的见解,我将非常感激。谢谢

好的,我想我回答了我自己的问题。当我修改以编程方式构建的架构以使用显式时间戳类型时

val modSearchSchema = StructType(searchSchema.fields.map {
      case StructField(name, _, nullable, metadata) if name == "searchTime" =>
        StructField(name, org.apache.spark.sql.types.DataTypes.TimestampType, nullable, metadata)
      case f => f
    })

当我们读出一个 Row 对象时,我没有改变逻辑。最初我们会读取一个 Long 并将其转换为一个时间戳,这是出​​了问题的地方,因为它正在读回 Long 为微秒,这会使它比我们预期的大 1,000 倍。将我们的读取更改为直接读取 Timestamp 对象,让底层逻辑解释这一点,将其从我们(我)手中夺走。所以:

// searchTime = new Timestamp(row.getAs[Long]("searchTime")) BROKEN

searchTime = row.getAs[Timestamp]("searchTime") // SUCCESS