将 json 文件加载到 spark 数据帧

Load json file to spark dataframe

我尝试在 spark 数据帧中加载以下 data.json 文件:

{"positionmessage":{"callsign": "PPH1", "name": 0.0, "mmsi": 100}}
{"positionmessage":{"callsign": "PPH2", "name": 0.0, "mmsi": 200}}
{"positionmessage":{"callsign": "PPH3", "name": 0.0, "mmsi": 300}}

通过以下代码:

from pyspark.sql import SparkSession
from pyspark.sql.types import ArrayType, StructField, StructType, StringType, IntegerType

appName = "PySpark Example - JSON file to Spark Data Frame"
master = "local"

# Create Spark session
spark = SparkSession.builder \
    .appName(appName) \
    .master(master) \
    .getOrCreate()

# Create a schema for the dataframe
schema = StructType([
    StructField('callsign', StringType(), True),
    StructField('name', StringType(), True),
    StructField('mmsi', IntegerType(), True)
])

# Create data frame
json_file_path = "data.json"
df = spark.read.json(json_file_path, schema, multiLine=True)
print(df.schema)
print(df.head(3))

它打印:[Row(callsign=None, name=None, mmsi=None)]。 我做错了什么?我已经在系统设置中设置了我的环境变量。

您有 positionmessage 结构字段并在 schema 中丢失。

更改 架构 以包含结构字段,如下所示:

schema = StructType([StructField("positionmessage",StructType([StructField('callsign', StringType(), True),
    StructField('name', StringType(), True),
    StructField('mmsi', IntegerType(), True)
]))])

spark.read.schema(schema).json("<path>").\
select("positionmessage.*").\
show()
#+--------+----+----+
#|callsign|name|mmsi|
#+--------+----+----+
#|    PPH1| 0.0| 100|
#|    PPH2| 0.0| 200|
#|    PPH3| 0.0| 300|
#+--------+----+----+