如何在 pyspark 中读取自定义格式的日期作为时间戳
How to read custom formatted dates as timestamp in pyspark
我想使用 spark.read() 从 .csv 文件中提取数据,同时执行架构。但是,我无法让 spark 将我的日期识别为时间戳。
首先我创建了一个虚拟文件来测试
%scala
Seq("1|1/15/2019 2:24:00 AM","2|test","3|").toDF().write.text("/tmp/input/csvDateReadTest")
然后我尝试读取它,并提供一个 dateFormat 字符串,但它无法识别我的日期,并将记录发送到 badRecordsPath
df = spark.read.format('csv')
.schema("id int, dt timestamp")
.option("delimiter","|")
.option("badRecordsPath","/tmp/badRecordsPath")
.option("dateFormat","M/dd/yyyy hh:mm:ss aaa")
.load("/tmp/input/csvDateReadTest")
结果,当我期望看到 2 条记录时,我在 df (ID 3) 中只得到了 1 条记录。(ID 1 和 3)
df.show()
+---+----+
| id| dt|
+---+----+
| 3|null|
+---+----+
您好,这是示例代码
df.withColumn("times",
from_unixtime(unix_timestamp(col("df"), "M/dd/yyyy hh:mm:ss a"),
"yyyy-MM-dd HH:mm:ss.SSSSSS"))
.show(false)
您必须将 dateFormat
更改为 timestampFormat
因为在您的情况下您需要时间戳类型而不是日期。此外,时间戳格式的值应为 mm/dd/yyyy h:mm:ss a
.
示例数据:
Seq(
"1|1/15/2019 2:24:00 AM",
"2|test",
"3|5/30/1981 3:11:00 PM"
).toDF().write.text("/tmp/input/csvDateReadTest")
随着时间戳的变化:
val df = spark.read.format("csv")
.schema("id int, dt timestamp")
.option("delimiter","|")
.option("badRecordsPath","/tmp/badRecordsPath")
.option("timestampFormat","mm/dd/yyyy h:mm:ss a")
.load("/tmp/input/csvDateReadTest")
输出:
+----+-------------------+
| id| dt|
+----+-------------------+
| 1|2019-01-15 02:24:00|
| 3|1981-01-30 15:11:00|
|null| null|
+----+-------------------+
请注意,id 为 2 的记录不符合架构定义,因此它将包含 null
。如果您还想保留无效记录,您需要将时间戳列更改为字符串,这种情况下的输出将是:
+---+--------------------+
| id| dt|
+---+--------------------+
| 1|1/15/2019 2:24:00 AM|
| 3|5/30/1981 3:11:00 PM|
| 2| test|
+---+--------------------+
更新:
为了将字符串 dt 更改为时间戳类型,您可以尝试使用 df.withColumn("dt", $"dt".cast("timestamp"))
,尽管这会失败并将所有值替换为 null。
您可以使用下一个代码实现此目的:
import org.apache.spark.sql.Row
import java.text.SimpleDateFormat
import java.util.{Date, Locale}
import java.sql.Timestamp
import scala.util.{Try, Success, Failure}
val formatter = new SimpleDateFormat("mm/dd/yyyy h:mm:ss a", Locale.US)
df.map{ case Row(id:Int, dt:String) =>
val tryParse = Try[Date](formatter.parse(dt))
val p_timestamp = tryParse match {
case Success(parsed) => new Timestamp(parsed.getTime())
case Failure(_) => null
}
(id, p_timestamp)
}.toDF("id", "dt").show
输出:
+---+-------------------+
| id| dt|
+---+-------------------+
| 1|2019-01-15 02:24:00|
| 3|1981-01-30 15:11:00|
| 2| null|
+---+-------------------+
我想使用 spark.read() 从 .csv 文件中提取数据,同时执行架构。但是,我无法让 spark 将我的日期识别为时间戳。
首先我创建了一个虚拟文件来测试
%scala
Seq("1|1/15/2019 2:24:00 AM","2|test","3|").toDF().write.text("/tmp/input/csvDateReadTest")
然后我尝试读取它,并提供一个 dateFormat 字符串,但它无法识别我的日期,并将记录发送到 badRecordsPath
df = spark.read.format('csv')
.schema("id int, dt timestamp")
.option("delimiter","|")
.option("badRecordsPath","/tmp/badRecordsPath")
.option("dateFormat","M/dd/yyyy hh:mm:ss aaa")
.load("/tmp/input/csvDateReadTest")
结果,当我期望看到 2 条记录时,我在 df (ID 3) 中只得到了 1 条记录。(ID 1 和 3)
df.show()
+---+----+
| id| dt|
+---+----+
| 3|null|
+---+----+
您好,这是示例代码
df.withColumn("times",
from_unixtime(unix_timestamp(col("df"), "M/dd/yyyy hh:mm:ss a"),
"yyyy-MM-dd HH:mm:ss.SSSSSS"))
.show(false)
您必须将 dateFormat
更改为 timestampFormat
因为在您的情况下您需要时间戳类型而不是日期。此外,时间戳格式的值应为 mm/dd/yyyy h:mm:ss a
.
示例数据:
Seq(
"1|1/15/2019 2:24:00 AM",
"2|test",
"3|5/30/1981 3:11:00 PM"
).toDF().write.text("/tmp/input/csvDateReadTest")
随着时间戳的变化:
val df = spark.read.format("csv")
.schema("id int, dt timestamp")
.option("delimiter","|")
.option("badRecordsPath","/tmp/badRecordsPath")
.option("timestampFormat","mm/dd/yyyy h:mm:ss a")
.load("/tmp/input/csvDateReadTest")
输出:
+----+-------------------+
| id| dt|
+----+-------------------+
| 1|2019-01-15 02:24:00|
| 3|1981-01-30 15:11:00|
|null| null|
+----+-------------------+
请注意,id 为 2 的记录不符合架构定义,因此它将包含 null
。如果您还想保留无效记录,您需要将时间戳列更改为字符串,这种情况下的输出将是:
+---+--------------------+
| id| dt|
+---+--------------------+
| 1|1/15/2019 2:24:00 AM|
| 3|5/30/1981 3:11:00 PM|
| 2| test|
+---+--------------------+
更新:
为了将字符串 dt 更改为时间戳类型,您可以尝试使用 df.withColumn("dt", $"dt".cast("timestamp"))
,尽管这会失败并将所有值替换为 null。
您可以使用下一个代码实现此目的:
import org.apache.spark.sql.Row
import java.text.SimpleDateFormat
import java.util.{Date, Locale}
import java.sql.Timestamp
import scala.util.{Try, Success, Failure}
val formatter = new SimpleDateFormat("mm/dd/yyyy h:mm:ss a", Locale.US)
df.map{ case Row(id:Int, dt:String) =>
val tryParse = Try[Date](formatter.parse(dt))
val p_timestamp = tryParse match {
case Success(parsed) => new Timestamp(parsed.getTime())
case Failure(_) => null
}
(id, p_timestamp)
}.toDF("id", "dt").show
输出:
+---+-------------------+
| id| dt|
+---+-------------------+
| 1|2019-01-15 02:24:00|
| 3|1981-01-30 15:11:00|
| 2| null|
+---+-------------------+