在 spark-csv 数据帧 reader 中解析 Micro/Nano 秒时间戳:结果不一致

Parse Micro/Nano Seconds timestamp in spark-csv Dataframe reader : Inconsistent results

我正在尝试读取时间戳到纳秒的 csv 文件。 文件 TestTimestamp.csv-

的示例内容

spark- 2.4.0, scala - 2.11.11

   /**
     * TestTimestamp.csv -
     * 101,2019-SEP-23 11.42.35.456789123 AM
     *
     */

尝试使用 timestampFormat = "yyyy-MMM-dd hh.mm.ss.SSSSSSSSS aaa"

读取它
val dataSchema = StructType(Array(StructField("ID", DoubleType, true), StructField("Created_TS", TimestampType, true)))

val data = spark.read.format("csv")
      .option("header", "false")
      .option("inferSchema", "false")
      .option("treatEmptyValuesAsNulls", "true")
      //.option("nullValue", "")
      .option("dateFormat", "yyyy-MMM-dd")
      .option("timestampFormat", "yyyy-MMM-dd hh.mm.ss.SSSSSSSSS aaa")
      .schema(dataSchema)
      .load("C:\TestData\Raw\TetraPak\Shipments\TestTimeStamp.csv")

    data.select('Created_TS).show

我得到的输出是完全错误的日期时间。 9月23日改为9月28日

+--------------------+
|          Created_TS|
+--------------------+
|2019-09-28 18:35:...|
+--------------------+

即使我有 24 小时格式的时间,例如 - “2019-SEP-23 16.42.35.456789123” 我尝试通过给出 timestampFormat = "yyyy-MMM-dd HH.mm.ss.SSS"

来仅使用第二个分数的前几位

类似的错误结果-

val data2 = spark.read.format("csv")
      .option("header", "false")
      .option("inferSchema", "false")
      .option("treatEmptyValuesAsNulls", "true")
      //.option("nullValue", "")
      .option("dateFormat", "yyyy-MMM-dd")
      .option("timestampFormat", "yyyy-MMM-dd hh.mm.ss.SSS")
      .schema(dataSchema)
      .load("C:\TestData\Raw\TetraPak\Shipments\TestTimeStamp.csv")

    data2.select('Created_TS).show

+--------------------+
|          Created_TS|
+--------------------+
|2019-09-28 23:35:...|
+--------------------+

在使用 csv reader 创建数据帧 时,有什么方法可以解析此类时间戳字符串?

解析日期的DataFrameReader uses the SimpleDateFormat

timestampFormat (default yyyy-MM-dd'T'HH:mm:ss.SSSXXX): sets the string that indicates a timestamp format. Custom date formats follow the formats at java.text.SimpleDateFormat. This applies to timestamp type.

不幸的是,SimpleDateFormat 不支持纳秒,因此最后一个点之后的日期部分将被解释为 456789123 毫秒,大约 126 小时。这个时间被添加到你的日期,这解释了你看到的奇怪结果。有关此主题的更多详细信息,请参阅 this answer

因此必须在读取 csv 后的第二步中解析日期,例如使用使用 DateTimeFormatter:

的 udf
val dataSchema = StructType(Array(StructField("ID", DoubleType, true), StructField("Created_TS_String", StringType, true)))

var df = spark.read.option("header", false)
  .option("inferSchema", "false")
  .option("treatEmptyValuesAsNulls", "true")
  .schema(dataSchema)
  .csv("C:\TestData\Raw\TetraPak\Shipments\TestTimeStamp.csv")

val toDate = udf((date: String) => {
  val formatter = new DateTimeFormatterBuilder()
    .parseCaseInsensitive()
    .appendPattern("yyyy-MMM-dd hh.mm.ss.SSSSSSSSS a").toFormatter()
  Timestamp.valueOf(LocalDateTime.parse(date, formatter))
})

df = df.withColumn("Created_TS", toDate('Created_TS_String))

这是受 werner 关于使用 udfs 的回答启发的解决方案..-

输入 csv -

101,2019-SEP-23 11.42.35.456789123 AM,2019-SEP-23 11.42.35.456789123 AM,2019-SEP-23 11.42.35.456789123 AM

带有 TimestampType 列的原始架构

val orig_schema = StructType(Array(StructField("ID", DoubleType, true), StructField("Created_TS", TimestampType, true), StructField("Updated_TS", TimestampType, true), StructField("Modified_TS", TimestampType, true)))

将所有 TimestampType 转换为 StringType

val dataSchema = StructType(orig_schema.map(x =>
      {
        x.dataType match {
          case TimestampType => StructField(x.name, StringType, x.nullable)
          case _             => x
        }

      }))

用于将 String 转换为 Timstamp 的 toDate 函数

//TODO parameterize string formats

    def toDate(date: String): java.sql.Timestamp = {
      val formatter = new DateTimeFormatterBuilder()
        .parseCaseInsensitive()
        .appendPattern("yyyy-MMM-dd hh.mm.ss.SSSSSSSSS a").toFormatter()
      Timestamp.valueOf(LocalDateTime.parse(date, formatter))
    }

// register toDate as udf
val to_timestamp = spark.sqlContext.udf.register("to_timestamp", toDate _)

从原始数据帧 select 创建列表达式

// Array of Column Name & Types
    val nameType: Array[(String, DataType)] = orig_schema.fields.map(f => (f.name, f.dataType))

// Create Column Expression to select from raw Dataframe
val selectExpr = nameType.map(f => {
      f._2 match {
        case TimestampType => expr(s"CASE WHEN ${f._1} is NULL THEN NULL ELSE to_timestamp(${f._1}) END AS ${f._1}")
        case _             => expr(s"${f._1}")
      }
    })

读取为 StringType ,使用列 select 或使用 udf 将字符串转换为时间戳的表达式

val data = spark.read.format("csv")
      .option("header", "false")
      .option("inferSchema", "false")
      .option("treatEmptyValuesAsNulls", "true")
      //.option("nullValue", "")
      .option("dateFormat", "yyyy-MMM-dd")
      .option("timestampFormat", "yyyy-MMM-dd hh.mm.ss.SSSSSSSSS aaa")
      .schema(dataSchema)
.load("C:\TestData\Raw\TetraPak\Shipments\TestTimestamp_new.csv").select(selectExpr: _*)

data.show

这是所需的输出..所以现在我不必担心列数和使用 udf 手动创建表达式

+-----+--------------------+--------------------+--------------------+
|   ID|          Created_TS|          Updated_TS|         Modified_TS|
+-----+--------------------+--------------------+--------------------+
|101.0|2019-09-23 11:42:...|2019-09-23 11:42:...|2019-09-23 11:42:...|
+-----+--------------------+--------------------+--------------------+