来自 json 文件的 Pyspark 格式自定义时间戳
Pyspark format custom timestamp from json file
我尝试从加载的 json 文件中读取自定义时间戳:
{"positionmessage":{"callsign": "PPH1", "name": "testschip-10", "mmsi": 100,"timestamplast": "2019-08-01T00:00:08Z"}}
{"positionmessage":{"callsign": "PPH2", "name": "testschip-11", "mmsi": 200,"timestamplast": "2019-08-01T00:00:01Z"}}
代码如下所示:
from pyspark.sql import SparkSession
from pyspark.sql.types import ArrayType, StructField, StructType, StringType, IntegerType, DateType, FloatType, TimestampType
appName = "PySpark Example - JSON file to Spark Data Frame"
master = "local"
# Create Spark session
spark = SparkSession.builder \
.appName(appName) \
.master(master) \
.getOrCreate()
schema = StructType([
StructField("positionmessage",
StructType([
StructField('callsign', StringType(), True),
StructField('name', StringType(), True),
StructField('timestamplast', TimestampType(), True),
StructField('mmsi', IntegerType(), True)
]))])
file_name = "data.json"
df = spark.read.json(file_name).select("positionmessage.*")
如何让时间戳显示为 yyyy-mm-dd HH:MM:SS?
读取定义了 schema
的 json 文件。
spark.read.schema(schema).json(file_name).select("positionmessage.*")
#+--------+------------+-------------------+----+
#|callsign| name| timestamplast|mmsi|
#+--------+------------+-------------------+----+
#| PPH1|testschip-10|2019-07-31 19:00:08| 100|
#| PPH2|testschip-11|2019-07-31 19:00:01| 200|
#+--------+------------+-------------------+----+
正如我们将 timestamplast
定义为 TimestampType()
spark 将 timestamp 到当地时间。
如果您不希望发生自动转换,则将 timestamplast 定义为 StringType 并使用 to_timestamp()
函数来获取格式为 yyyy-MM-dd HH:MM:SS
from pyspark.sql.functions import *
from pyspark.sql.types import *
spark.read.schema(schema).json("file_name").\
select("positionmessage.*").\
withColumn("timestamplast",to_timestamp(col("timestamplast"),"yyyy-MM-dd'T'HH:mm:ss'Z'")).\
show()
#using from_unixtime and unix_timestamp functions
spark.read.schema(schema).json("file_name").\
select("positionmessage.*").\
withColumn("timestamplast",from_unixtime(unix_timestamp(col("timestamplast"),"yyyy-MM-dd'T'HH:mm:ss'Z'"),'yyyy-MM-dd HH:mm:ss')).\
show()
#+--------+------------+-------------------+----+
#|callsign| name| timestamplast|mmsi|
#+--------+------------+-------------------+----+
#| PPH1|testschip-10|2019-08-01 00:00:08| 100|
#| PPH2|testschip-11|2019-08-01 00:00:01| 200|
#+--------+------------+-------------------+----+
我尝试从加载的 json 文件中读取自定义时间戳:
{"positionmessage":{"callsign": "PPH1", "name": "testschip-10", "mmsi": 100,"timestamplast": "2019-08-01T00:00:08Z"}}
{"positionmessage":{"callsign": "PPH2", "name": "testschip-11", "mmsi": 200,"timestamplast": "2019-08-01T00:00:01Z"}}
代码如下所示:
from pyspark.sql import SparkSession
from pyspark.sql.types import ArrayType, StructField, StructType, StringType, IntegerType, DateType, FloatType, TimestampType
appName = "PySpark Example - JSON file to Spark Data Frame"
master = "local"
# Create Spark session
spark = SparkSession.builder \
.appName(appName) \
.master(master) \
.getOrCreate()
schema = StructType([
StructField("positionmessage",
StructType([
StructField('callsign', StringType(), True),
StructField('name', StringType(), True),
StructField('timestamplast', TimestampType(), True),
StructField('mmsi', IntegerType(), True)
]))])
file_name = "data.json"
df = spark.read.json(file_name).select("positionmessage.*")
如何让时间戳显示为 yyyy-mm-dd HH:MM:SS?
读取定义了 schema
的 json 文件。
spark.read.schema(schema).json(file_name).select("positionmessage.*")
#+--------+------------+-------------------+----+
#|callsign| name| timestamplast|mmsi|
#+--------+------------+-------------------+----+
#| PPH1|testschip-10|2019-07-31 19:00:08| 100|
#| PPH2|testschip-11|2019-07-31 19:00:01| 200|
#+--------+------------+-------------------+----+
正如我们将 timestamplast
定义为 TimestampType()
spark 将 timestamp 到当地时间。
如果您不希望发生自动转换,则将 timestamplast 定义为 StringType 并使用 to_timestamp()
函数来获取格式为 yyyy-MM-dd HH:MM:SS
from pyspark.sql.functions import *
from pyspark.sql.types import *
spark.read.schema(schema).json("file_name").\
select("positionmessage.*").\
withColumn("timestamplast",to_timestamp(col("timestamplast"),"yyyy-MM-dd'T'HH:mm:ss'Z'")).\
show()
#using from_unixtime and unix_timestamp functions
spark.read.schema(schema).json("file_name").\
select("positionmessage.*").\
withColumn("timestamplast",from_unixtime(unix_timestamp(col("timestamplast"),"yyyy-MM-dd'T'HH:mm:ss'Z'"),'yyyy-MM-dd HH:mm:ss')).\
show()
#+--------+------------+-------------------+----+
#|callsign| name| timestamplast|mmsi|
#+--------+------------+-------------------+----+
#| PPH1|testschip-10|2019-08-01 00:00:08| 100|
#| PPH2|testschip-11|2019-08-01 00:00:01| 200|
#+--------+------------+-------------------+----+