在 spark-csv 数据帧 reader 中解析 Micro/Nano 秒时间戳:结果不一致
Parse Micro/Nano Seconds timestamp in spark-csv Dataframe reader : Inconsistent results
我正在尝试读取时间戳到纳秒的 csv 文件。
文件 TestTimestamp.csv-
的示例内容
spark- 2.4.0, scala - 2.11.11
/**
* TestTimestamp.csv -
* 101,2019-SEP-23 11.42.35.456789123 AM
*
*/
尝试使用 timestampFormat = "yyyy-MMM-dd hh.mm.ss.SSSSSSSSS aaa"
读取它
val dataSchema = StructType(Array(StructField("ID", DoubleType, true), StructField("Created_TS", TimestampType, true)))
val data = spark.read.format("csv")
.option("header", "false")
.option("inferSchema", "false")
.option("treatEmptyValuesAsNulls", "true")
//.option("nullValue", "")
.option("dateFormat", "yyyy-MMM-dd")
.option("timestampFormat", "yyyy-MMM-dd hh.mm.ss.SSSSSSSSS aaa")
.schema(dataSchema)
.load("C:\TestData\Raw\TetraPak\Shipments\TestTimeStamp.csv")
data.select('Created_TS).show
我得到的输出是完全错误的日期时间。 9月23日改为9月28日
+--------------------+
| Created_TS|
+--------------------+
|2019-09-28 18:35:...|
+--------------------+
即使我有 24 小时格式的时间,例如 -
“2019-SEP-23 16.42.35.456789123”
我尝试通过给出 timestampFormat = "yyyy-MMM-dd HH.mm.ss.SSS"
来仅使用第二个分数的前几位
类似的错误结果-
val data2 = spark.read.format("csv")
.option("header", "false")
.option("inferSchema", "false")
.option("treatEmptyValuesAsNulls", "true")
//.option("nullValue", "")
.option("dateFormat", "yyyy-MMM-dd")
.option("timestampFormat", "yyyy-MMM-dd hh.mm.ss.SSS")
.schema(dataSchema)
.load("C:\TestData\Raw\TetraPak\Shipments\TestTimeStamp.csv")
data2.select('Created_TS).show
+--------------------+
| Created_TS|
+--------------------+
|2019-09-28 23:35:...|
+--------------------+
在使用 csv reader 创建数据帧 时,有什么方法可以解析此类时间戳字符串?
解析日期的DataFrameReader uses the SimpleDateFormat:
timestampFormat (default yyyy-MM-dd'T'HH:mm:ss.SSSXXX): sets the string that indicates a timestamp format. Custom date formats follow the formats at java.text.SimpleDateFormat. This applies to timestamp type.
不幸的是,SimpleDateFormat 不支持纳秒,因此最后一个点之后的日期部分将被解释为 456789123 毫秒,大约 126 小时。这个时间被添加到你的日期,这解释了你看到的奇怪结果。有关此主题的更多详细信息,请参阅 this answer。
因此必须在读取 csv 后的第二步中解析日期,例如使用使用 DateTimeFormatter:
的 udf
val dataSchema = StructType(Array(StructField("ID", DoubleType, true), StructField("Created_TS_String", StringType, true)))
var df = spark.read.option("header", false)
.option("inferSchema", "false")
.option("treatEmptyValuesAsNulls", "true")
.schema(dataSchema)
.csv("C:\TestData\Raw\TetraPak\Shipments\TestTimeStamp.csv")
val toDate = udf((date: String) => {
val formatter = new DateTimeFormatterBuilder()
.parseCaseInsensitive()
.appendPattern("yyyy-MMM-dd hh.mm.ss.SSSSSSSSS a").toFormatter()
Timestamp.valueOf(LocalDateTime.parse(date, formatter))
})
df = df.withColumn("Created_TS", toDate('Created_TS_String))
这是受 werner 关于使用 udfs 的回答启发的解决方案..-
输入 csv -
101,2019-SEP-23 11.42.35.456789123 AM,2019-SEP-23 11.42.35.456789123 AM,2019-SEP-23 11.42.35.456789123 AM
带有 TimestampType 列的原始架构
val orig_schema = StructType(Array(StructField("ID", DoubleType, true), StructField("Created_TS", TimestampType, true), StructField("Updated_TS", TimestampType, true), StructField("Modified_TS", TimestampType, true)))
将所有 TimestampType 转换为 StringType
val dataSchema = StructType(orig_schema.map(x =>
{
x.dataType match {
case TimestampType => StructField(x.name, StringType, x.nullable)
case _ => x
}
}))
用于将 String 转换为 Timstamp 的 toDate 函数
//TODO parameterize string formats
def toDate(date: String): java.sql.Timestamp = {
val formatter = new DateTimeFormatterBuilder()
.parseCaseInsensitive()
.appendPattern("yyyy-MMM-dd hh.mm.ss.SSSSSSSSS a").toFormatter()
Timestamp.valueOf(LocalDateTime.parse(date, formatter))
}
// register toDate as udf
val to_timestamp = spark.sqlContext.udf.register("to_timestamp", toDate _)
从原始数据帧 select 创建列表达式
// Array of Column Name & Types
val nameType: Array[(String, DataType)] = orig_schema.fields.map(f => (f.name, f.dataType))
// Create Column Expression to select from raw Dataframe
val selectExpr = nameType.map(f => {
f._2 match {
case TimestampType => expr(s"CASE WHEN ${f._1} is NULL THEN NULL ELSE to_timestamp(${f._1}) END AS ${f._1}")
case _ => expr(s"${f._1}")
}
})
读取为 StringType ,使用列 select 或使用 udf 将字符串转换为时间戳的表达式
val data = spark.read.format("csv")
.option("header", "false")
.option("inferSchema", "false")
.option("treatEmptyValuesAsNulls", "true")
//.option("nullValue", "")
.option("dateFormat", "yyyy-MMM-dd")
.option("timestampFormat", "yyyy-MMM-dd hh.mm.ss.SSSSSSSSS aaa")
.schema(dataSchema)
.load("C:\TestData\Raw\TetraPak\Shipments\TestTimestamp_new.csv").select(selectExpr: _*)
data.show
这是所需的输出..所以现在我不必担心列数和使用 udf 手动创建表达式
+-----+--------------------+--------------------+--------------------+
| ID| Created_TS| Updated_TS| Modified_TS|
+-----+--------------------+--------------------+--------------------+
|101.0|2019-09-23 11:42:...|2019-09-23 11:42:...|2019-09-23 11:42:...|
+-----+--------------------+--------------------+--------------------+
我正在尝试读取时间戳到纳秒的 csv 文件。 文件 TestTimestamp.csv-
的示例内容spark- 2.4.0, scala - 2.11.11
/**
* TestTimestamp.csv -
* 101,2019-SEP-23 11.42.35.456789123 AM
*
*/
尝试使用 timestampFormat = "yyyy-MMM-dd hh.mm.ss.SSSSSSSSS aaa"
读取它val dataSchema = StructType(Array(StructField("ID", DoubleType, true), StructField("Created_TS", TimestampType, true)))
val data = spark.read.format("csv")
.option("header", "false")
.option("inferSchema", "false")
.option("treatEmptyValuesAsNulls", "true")
//.option("nullValue", "")
.option("dateFormat", "yyyy-MMM-dd")
.option("timestampFormat", "yyyy-MMM-dd hh.mm.ss.SSSSSSSSS aaa")
.schema(dataSchema)
.load("C:\TestData\Raw\TetraPak\Shipments\TestTimeStamp.csv")
data.select('Created_TS).show
我得到的输出是完全错误的日期时间。 9月23日改为9月28日
+--------------------+
| Created_TS|
+--------------------+
|2019-09-28 18:35:...|
+--------------------+
即使我有 24 小时格式的时间,例如 - “2019-SEP-23 16.42.35.456789123” 我尝试通过给出 timestampFormat = "yyyy-MMM-dd HH.mm.ss.SSS"
来仅使用第二个分数的前几位类似的错误结果-
val data2 = spark.read.format("csv")
.option("header", "false")
.option("inferSchema", "false")
.option("treatEmptyValuesAsNulls", "true")
//.option("nullValue", "")
.option("dateFormat", "yyyy-MMM-dd")
.option("timestampFormat", "yyyy-MMM-dd hh.mm.ss.SSS")
.schema(dataSchema)
.load("C:\TestData\Raw\TetraPak\Shipments\TestTimeStamp.csv")
data2.select('Created_TS).show
+--------------------+
| Created_TS|
+--------------------+
|2019-09-28 23:35:...|
+--------------------+
在使用 csv reader 创建数据帧 时,有什么方法可以解析此类时间戳字符串?
解析日期的DataFrameReader uses the SimpleDateFormat:
timestampFormat (default yyyy-MM-dd'T'HH:mm:ss.SSSXXX): sets the string that indicates a timestamp format. Custom date formats follow the formats at java.text.SimpleDateFormat. This applies to timestamp type.
不幸的是,SimpleDateFormat 不支持纳秒,因此最后一个点之后的日期部分将被解释为 456789123 毫秒,大约 126 小时。这个时间被添加到你的日期,这解释了你看到的奇怪结果。有关此主题的更多详细信息,请参阅 this answer。
因此必须在读取 csv 后的第二步中解析日期,例如使用使用 DateTimeFormatter:
的 udfval dataSchema = StructType(Array(StructField("ID", DoubleType, true), StructField("Created_TS_String", StringType, true)))
var df = spark.read.option("header", false)
.option("inferSchema", "false")
.option("treatEmptyValuesAsNulls", "true")
.schema(dataSchema)
.csv("C:\TestData\Raw\TetraPak\Shipments\TestTimeStamp.csv")
val toDate = udf((date: String) => {
val formatter = new DateTimeFormatterBuilder()
.parseCaseInsensitive()
.appendPattern("yyyy-MMM-dd hh.mm.ss.SSSSSSSSS a").toFormatter()
Timestamp.valueOf(LocalDateTime.parse(date, formatter))
})
df = df.withColumn("Created_TS", toDate('Created_TS_String))
这是受 werner 关于使用 udfs 的回答启发的解决方案..-
输入 csv -
101,2019-SEP-23 11.42.35.456789123 AM,2019-SEP-23 11.42.35.456789123 AM,2019-SEP-23 11.42.35.456789123 AM
带有 TimestampType 列的原始架构
val orig_schema = StructType(Array(StructField("ID", DoubleType, true), StructField("Created_TS", TimestampType, true), StructField("Updated_TS", TimestampType, true), StructField("Modified_TS", TimestampType, true)))
将所有 TimestampType 转换为 StringType
val dataSchema = StructType(orig_schema.map(x =>
{
x.dataType match {
case TimestampType => StructField(x.name, StringType, x.nullable)
case _ => x
}
}))
用于将 String 转换为 Timstamp 的 toDate 函数
//TODO parameterize string formats
def toDate(date: String): java.sql.Timestamp = {
val formatter = new DateTimeFormatterBuilder()
.parseCaseInsensitive()
.appendPattern("yyyy-MMM-dd hh.mm.ss.SSSSSSSSS a").toFormatter()
Timestamp.valueOf(LocalDateTime.parse(date, formatter))
}
// register toDate as udf
val to_timestamp = spark.sqlContext.udf.register("to_timestamp", toDate _)
从原始数据帧 select 创建列表达式
// Array of Column Name & Types
val nameType: Array[(String, DataType)] = orig_schema.fields.map(f => (f.name, f.dataType))
// Create Column Expression to select from raw Dataframe
val selectExpr = nameType.map(f => {
f._2 match {
case TimestampType => expr(s"CASE WHEN ${f._1} is NULL THEN NULL ELSE to_timestamp(${f._1}) END AS ${f._1}")
case _ => expr(s"${f._1}")
}
})
读取为 StringType ,使用列 select 或使用 udf 将字符串转换为时间戳的表达式
val data = spark.read.format("csv")
.option("header", "false")
.option("inferSchema", "false")
.option("treatEmptyValuesAsNulls", "true")
//.option("nullValue", "")
.option("dateFormat", "yyyy-MMM-dd")
.option("timestampFormat", "yyyy-MMM-dd hh.mm.ss.SSSSSSSSS aaa")
.schema(dataSchema)
.load("C:\TestData\Raw\TetraPak\Shipments\TestTimestamp_new.csv").select(selectExpr: _*)
data.show
这是所需的输出..所以现在我不必担心列数和使用 udf 手动创建表达式
+-----+--------------------+--------------------+--------------------+
| ID| Created_TS| Updated_TS| Modified_TS|
+-----+--------------------+--------------------+--------------------+
|101.0|2019-09-23 11:42:...|2019-09-23 11:42:...|2019-09-23 11:42:...|
+-----+--------------------+--------------------+--------------------+