Spark 2.3 时间戳减去毫秒
Spark 2.3 timestamp subtract milliseconds
我正在使用 Spark 2.3,我已经阅读 它不支持时间戳毫秒(仅在 2.4+ 中),但我正在寻找有关如何做我需要做的事情的想法。
我正在处理的数据将日期存储为 Parquet 文件中的字符串数据类型,格式为:2021-07-09T01:41:58Z
我需要从中减去一毫秒。如果是 Spark 2.4,我想我可以这样做:
to_timestamp(col("sourceStartTimestamp")) - expr("INTERVAL 0.001 SECONDS")
但是因为它是 Spark 2.3,所以没有任何作用。我确认它可以减去 1 秒,但它会忽略任何小于一秒的值。
任何人都可以建议在 Spark 2.3 中如何执行此操作的解决方法吗?最终,如果有任何不同,结果将需要是 String 数据类型。
由于 Spark 2.3(或更低版本)不支持毫秒时间戳,请考虑使用采用增量毫秒和日期格式的 UDF 来使用 java.time
的 plusNanos()
:
def getMillisTS(delta: Long, fmt: String = "yyyy-MM-dd HH:mm:ss.SSS") = udf{
(ts: java.sql.Timestamp) =>
import java.time.format.DateTimeFormatter
ts.toLocalDateTime.plusNanos(delta * 1000000).format(DateTimeFormatter.ofPattern(fmt))
}
测试-运行 UDF:
val df = Seq("2021-01-01 00:00:00", "2021-02-15 12:30:00").toDF("ts")
df.withColumn("millisTS", getMillisTS(-1)($"ts")).show(false)
/*
+-------------------+-----------------------+
|ts |millisTS |
+-------------------+-----------------------+
|2021-01-01 00:00:00|2020-12-31 23:59:59.999|
|2021-02-15 12:30:00|2021-02-15 12:29:59.999|
+-------------------+-----------------------+
*/
df.withColumn("millisTS", getMillisTS(5000)($"ts")).show(false)
/*
+-------------------+-----------------------+
|ts |millisTS |
+-------------------+-----------------------+
|2021-01-01 00:00:00|2021-01-01 00:00:05.000|
|2021-02-15 12:30:00|2021-02-15 12:30:05.000|
+-------------------+-----------------------+
*/
val df = Seq("2021-01-01T00:00:00Z", "2021-02-15T12:30:00Z").toDF("ts")
df.withColumn(
"millisTS",
getMillisTS(-1, "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")(to_timestamp($"ts", "yyyy-MM-dd'T'HH:mm:ss'Z'"))
).show(false)
/*
+-------------------+------------------------+
|ts |millisTS |
+-------------------+------------------------+
|2021-01-01 00:00:00|2020-12-31T23:59:59.999Z|
|2021-02-15 12:30:00|2021-02-15T12:29:59.999Z|
+-------------------+------------------------+
*/
我正在使用 Spark 2.3,我已经阅读
我正在处理的数据将日期存储为 Parquet 文件中的字符串数据类型,格式为:2021-07-09T01:41:58Z
我需要从中减去一毫秒。如果是 Spark 2.4,我想我可以这样做:
to_timestamp(col("sourceStartTimestamp")) - expr("INTERVAL 0.001 SECONDS")
但是因为它是 Spark 2.3,所以没有任何作用。我确认它可以减去 1 秒,但它会忽略任何小于一秒的值。
任何人都可以建议在 Spark 2.3 中如何执行此操作的解决方法吗?最终,如果有任何不同,结果将需要是 String 数据类型。
由于 Spark 2.3(或更低版本)不支持毫秒时间戳,请考虑使用采用增量毫秒和日期格式的 UDF 来使用 java.time
的 plusNanos()
:
def getMillisTS(delta: Long, fmt: String = "yyyy-MM-dd HH:mm:ss.SSS") = udf{
(ts: java.sql.Timestamp) =>
import java.time.format.DateTimeFormatter
ts.toLocalDateTime.plusNanos(delta * 1000000).format(DateTimeFormatter.ofPattern(fmt))
}
测试-运行 UDF:
val df = Seq("2021-01-01 00:00:00", "2021-02-15 12:30:00").toDF("ts")
df.withColumn("millisTS", getMillisTS(-1)($"ts")).show(false)
/*
+-------------------+-----------------------+
|ts |millisTS |
+-------------------+-----------------------+
|2021-01-01 00:00:00|2020-12-31 23:59:59.999|
|2021-02-15 12:30:00|2021-02-15 12:29:59.999|
+-------------------+-----------------------+
*/
df.withColumn("millisTS", getMillisTS(5000)($"ts")).show(false)
/*
+-------------------+-----------------------+
|ts |millisTS |
+-------------------+-----------------------+
|2021-01-01 00:00:00|2021-01-01 00:00:05.000|
|2021-02-15 12:30:00|2021-02-15 12:30:05.000|
+-------------------+-----------------------+
*/
val df = Seq("2021-01-01T00:00:00Z", "2021-02-15T12:30:00Z").toDF("ts")
df.withColumn(
"millisTS",
getMillisTS(-1, "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")(to_timestamp($"ts", "yyyy-MM-dd'T'HH:mm:ss'Z'"))
).show(false)
/*
+-------------------+------------------------+
|ts |millisTS |
+-------------------+------------------------+
|2021-01-01 00:00:00|2020-12-31T23:59:59.999Z|
|2021-02-15 12:30:00|2021-02-15T12:29:59.999Z|
+-------------------+------------------------+
*/