在pyspark中将时间戳转换为纪元毫秒
Converting timestamp to epoch milliseconds in pyspark
我有如下数据集:
epoch_seconds
eq_time
1636663343887
2021-11-12 02:12:23
现在,我正在尝试将 eq_time
秒转换为 epoch
秒,这应该与第一列的值匹配,但我无法这样做。下面是我的代码:
df = spark.sql("select '1636663343887' as epoch_seconds")
df1 = df.withColumn("eq_time", from_unixtime(col("epoch_seconds") / 1000))
df2 = df1.withColumn("epoch_sec", unix_timestamp(df1.eq_time))
df2.show(truncate=False)
我得到如下输出:
epoch_seconds
eq_time
epoch_sec
1636663343887
2021-11-12 02:12:23
1636663343
我尝试了 但没有帮助。我的 expected
输出是第一列和第三列应该相互匹配。
P.S:我在本地使用 Spark 3.1.1
版本,而在生产环境中使用 Spark 2.4.3
,我的最终目标是运行 投入生产。
要在 Python 中的时间格式之间进行转换,datetime.datetime.strptime()
和 .strftime()
很有用。
从 eq_time 读取字符串并处理成 Python 日期时间对象:
import datetime
t = datetime.datetime.strptime('2021-11-12 02:12:23', '%Y-%m-%d %H:%M:%S')
以epoch_seconds格式打印t:
print(t.strftime('%s')
Pandas 具有日期处理功能,其工作方式类似:
您可以 运行 在 eq_time 列中,在提取数据后立即这样做,以确保您的 DataFrame 包含格式正确的日期
在将纪元转换为 spark 时间戳类型时,使用 to_timestamp
而不是 from_unixtime
来保留毫秒部分。
然后,要返回到以毫秒为单位的时间戳,您可以使用 unix_timestamp
函数或通过转换为 long 类型,并将结果与您使用 [= 获得的时间戳的秒数部分连接起来17=]:
import pyspark.sql.functions as F
df = spark.sql("select '1636663343887' as epoch_ms")
df2 = df.withColumn(
"eq_time",
F.to_timestamp(F.col("epoch_ms") / 1000)
).withColumn(
"epoch_milli",
F.concat(F.unix_timestamp("eq_time"), F.date_format("eq_time", "S"))
)
df2.show(truncate=False)
#+-------------+-----------------------+-------------+
#|epoch_ms |eq_time |epoch_milli |
#+-------------+-----------------------+-------------+
#|1636663343887|2021-11-11 21:42:23.887|1636663343887|
#+-------------+-----------------------+-------------+
我更喜欢只使用 cast
.
进行时间戳转换
from pyspark.sql.functions import col
df = spark.sql("select '1636663343887' as epoch_seconds")
df = df.withColumn("eq_time", (col("epoch_seconds") / 1000).cast("timestamp"))
df = df.withColumn("epoch_sec", (col("eq_time").cast("double") * 1000).cast("long"))
df.show(truncate=False)
如果你这样做,你需要在几秒钟内思考,它会完美地工作。
我有如下数据集:
epoch_seconds | eq_time |
---|---|
1636663343887 | 2021-11-12 02:12:23 |
现在,我正在尝试将 eq_time
秒转换为 epoch
秒,这应该与第一列的值匹配,但我无法这样做。下面是我的代码:
df = spark.sql("select '1636663343887' as epoch_seconds")
df1 = df.withColumn("eq_time", from_unixtime(col("epoch_seconds") / 1000))
df2 = df1.withColumn("epoch_sec", unix_timestamp(df1.eq_time))
df2.show(truncate=False)
我得到如下输出:
epoch_seconds | eq_time | epoch_sec |
---|---|---|
1636663343887 | 2021-11-12 02:12:23 | 1636663343 |
我尝试了 expected
输出是第一列和第三列应该相互匹配。
P.S:我在本地使用 Spark 3.1.1
版本,而在生产环境中使用 Spark 2.4.3
,我的最终目标是运行 投入生产。
要在 Python 中的时间格式之间进行转换,datetime.datetime.strptime()
和 .strftime()
很有用。
从 eq_time 读取字符串并处理成 Python 日期时间对象:
import datetime
t = datetime.datetime.strptime('2021-11-12 02:12:23', '%Y-%m-%d %H:%M:%S')
以epoch_seconds格式打印t:
print(t.strftime('%s')
Pandas 具有日期处理功能,其工作方式类似:
您可以 运行 在 eq_time 列中,在提取数据后立即这样做,以确保您的 DataFrame 包含格式正确的日期
在将纪元转换为 spark 时间戳类型时,使用 to_timestamp
而不是 from_unixtime
来保留毫秒部分。
然后,要返回到以毫秒为单位的时间戳,您可以使用 unix_timestamp
函数或通过转换为 long 类型,并将结果与您使用 [= 获得的时间戳的秒数部分连接起来17=]:
import pyspark.sql.functions as F
df = spark.sql("select '1636663343887' as epoch_ms")
df2 = df.withColumn(
"eq_time",
F.to_timestamp(F.col("epoch_ms") / 1000)
).withColumn(
"epoch_milli",
F.concat(F.unix_timestamp("eq_time"), F.date_format("eq_time", "S"))
)
df2.show(truncate=False)
#+-------------+-----------------------+-------------+
#|epoch_ms |eq_time |epoch_milli |
#+-------------+-----------------------+-------------+
#|1636663343887|2021-11-11 21:42:23.887|1636663343887|
#+-------------+-----------------------+-------------+
我更喜欢只使用 cast
.
from pyspark.sql.functions import col
df = spark.sql("select '1636663343887' as epoch_seconds")
df = df.withColumn("eq_time", (col("epoch_seconds") / 1000).cast("timestamp"))
df = df.withColumn("epoch_sec", (col("eq_time").cast("double") * 1000).cast("long"))
df.show(truncate=False)
如果你这样做,你需要在几秒钟内思考,它会完美地工作。