Spark 中的时间戳格式和时区 (scala API)
Timestamp formats and time zones in Spark (scala API)
******* 更新 ********
按照评论中的建议,我删除了代码中不相关的部分:
我的要求:
- 将毫秒数统一为 3
- 将字符串转换为时间戳并将值保留为 UTC
创建数据框:
val df = Seq("2018-09-02T05:05:03.456Z","2018-09-02T04:08:32.1Z","2018-09-02T05:05:45.65Z").toDF("Timestamp")
这里是使用 spark shell:
的结果
************ 结束更新 ******************************* **
我在尝试使用 scala 处理 Spark 中的时区和时间戳格式时感到非常头疼。
这是我的脚本的简化版,用于解释我的问题:
import org.apache.spark.sql.functions._
val jsonRDD = sc.wholeTextFiles("file:///data/home2/phernandez/vpp/Test_Message.json")
val jsonDF = spark.read.json(jsonRDD.map(f => f._2))
这是生成的架构:
root
|-- MeasuredValues: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- MeasuredValue: double (nullable = true)
| | |-- Status: long (nullable = true)
| | |-- Timestamp: string (nullable = true)
那我就select时间戳字段如下
jsonDF.select(explode($"MeasuredValues").as("Values")).select($"Values.Timestamp").show(5,false)
我首先要解决的是每个时间戳的毫秒数,并将其统一为三个。
我应用了 date_format 如下
jsonDF.select(explode($"MeasuredValues").as("Values")).select(date_format($"Values.Timestamp","yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")).show(5,false)
固定了毫秒格式,但时间戳从 UTC 转换为本地时间。
为了解决这个问题,我应用了 to_utc_timestamp 和我当地的时区。
jsonDF.select(explode($"MeasuredValues").as("Values")).select(to_utc_timestamp(date_format($"Values.Timestamp","yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"),"Europe/Berlin").as("Timestamp")).show(5,false)
更糟糕的是,没有返回 UTC 值,并且丢失了毫秒格式。
任何想法如何处理这个?我会感激的
BR。保罗
问题的原因是转换使用的时间格式字符串:
yyyy-MM-dd'T'HH:mm:ss.SSS'Z'
如您所见,Z
在单引号内,这意味着它不被解释为区域偏移标记,而只是像中间的T
这样的字符。
因此,格式字符串应更改为
yyyy-MM-dd'T'HH:mm:ss.SSSX
其中 X
是 Java 标准日期时间格式化程序模式(Z
是 0 的偏移值)。
现在,源数据可以转换为 UTC 时间戳:
val srcDF = Seq(
("2018-04-10T13:30:34.45Z"),
("2018-04-10T13:45:55.4Z"),
("2018-04-10T14:00:00.234Z"),
("2018-04-10T14:15:04.34Z"),
("2018-04-10T14:30:23.45Z")
).toDF("Timestamp")
val convertedDF = srcDF.select(to_utc_timestamp(date_format($"Timestamp", "yyyy-MM-dd'T'HH:mm:ss.SSSX"), "Europe/Berlin").as("converted"))
convertedDF.printSchema()
convertedDF.show(false)
/**
root
|-- converted: timestamp (nullable = true)
+-----------------------+
|converted |
+-----------------------+
|2018-04-10 13:30:34.45 |
|2018-04-10 13:45:55.4 |
|2018-04-10 14:00:00.234|
|2018-04-10 14:15:04.34 |
|2018-04-10 14:30:23.45 |
+-----------------------+
*/
如果您需要将时间戳转换回字符串并将值规范化为具有 3 个尾随零,则应该有另一个 date_format
调用,类似于您已经在问题中应用的调用。
******* 更新 ********
按照评论中的建议,我删除了代码中不相关的部分:
我的要求:
- 将毫秒数统一为 3
- 将字符串转换为时间戳并将值保留为 UTC
创建数据框:
val df = Seq("2018-09-02T05:05:03.456Z","2018-09-02T04:08:32.1Z","2018-09-02T05:05:45.65Z").toDF("Timestamp")
这里是使用 spark shell:
的结果************ 结束更新 ******************************* **
我在尝试使用 scala 处理 Spark 中的时区和时间戳格式时感到非常头疼。
这是我的脚本的简化版,用于解释我的问题:
import org.apache.spark.sql.functions._
val jsonRDD = sc.wholeTextFiles("file:///data/home2/phernandez/vpp/Test_Message.json")
val jsonDF = spark.read.json(jsonRDD.map(f => f._2))
这是生成的架构:
root
|-- MeasuredValues: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- MeasuredValue: double (nullable = true)
| | |-- Status: long (nullable = true)
| | |-- Timestamp: string (nullable = true)
那我就select时间戳字段如下
jsonDF.select(explode($"MeasuredValues").as("Values")).select($"Values.Timestamp").show(5,false)
我首先要解决的是每个时间戳的毫秒数,并将其统一为三个。
我应用了 date_format 如下
jsonDF.select(explode($"MeasuredValues").as("Values")).select(date_format($"Values.Timestamp","yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")).show(5,false)
固定了毫秒格式,但时间戳从 UTC 转换为本地时间。
为了解决这个问题,我应用了 to_utc_timestamp 和我当地的时区。
jsonDF.select(explode($"MeasuredValues").as("Values")).select(to_utc_timestamp(date_format($"Values.Timestamp","yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"),"Europe/Berlin").as("Timestamp")).show(5,false)
更糟糕的是,没有返回 UTC 值,并且丢失了毫秒格式。
任何想法如何处理这个?我会感激的
BR。保罗
问题的原因是转换使用的时间格式字符串:
yyyy-MM-dd'T'HH:mm:ss.SSS'Z'
如您所见,Z
在单引号内,这意味着它不被解释为区域偏移标记,而只是像中间的T
这样的字符。
因此,格式字符串应更改为
yyyy-MM-dd'T'HH:mm:ss.SSSX
其中 X
是 Java 标准日期时间格式化程序模式(Z
是 0 的偏移值)。
现在,源数据可以转换为 UTC 时间戳:
val srcDF = Seq(
("2018-04-10T13:30:34.45Z"),
("2018-04-10T13:45:55.4Z"),
("2018-04-10T14:00:00.234Z"),
("2018-04-10T14:15:04.34Z"),
("2018-04-10T14:30:23.45Z")
).toDF("Timestamp")
val convertedDF = srcDF.select(to_utc_timestamp(date_format($"Timestamp", "yyyy-MM-dd'T'HH:mm:ss.SSSX"), "Europe/Berlin").as("converted"))
convertedDF.printSchema()
convertedDF.show(false)
/**
root
|-- converted: timestamp (nullable = true)
+-----------------------+
|converted |
+-----------------------+
|2018-04-10 13:30:34.45 |
|2018-04-10 13:45:55.4 |
|2018-04-10 14:00:00.234|
|2018-04-10 14:15:04.34 |
|2018-04-10 14:30:23.45 |
+-----------------------+
*/
如果您需要将时间戳转换回字符串并将值规范化为具有 3 个尾随零,则应该有另一个 date_format
调用,类似于您已经在问题中应用的调用。