将 json 文件加载到 spark 数据帧
Load json file to spark dataframe
我尝试在 spark 数据帧中加载以下 data.json 文件:
{"positionmessage":{"callsign": "PPH1", "name": 0.0, "mmsi": 100}}
{"positionmessage":{"callsign": "PPH2", "name": 0.0, "mmsi": 200}}
{"positionmessage":{"callsign": "PPH3", "name": 0.0, "mmsi": 300}}
通过以下代码:
from pyspark.sql import SparkSession
from pyspark.sql.types import ArrayType, StructField, StructType, StringType, IntegerType
appName = "PySpark Example - JSON file to Spark Data Frame"
master = "local"
# Create Spark session
spark = SparkSession.builder \
.appName(appName) \
.master(master) \
.getOrCreate()
# Create a schema for the dataframe
schema = StructType([
StructField('callsign', StringType(), True),
StructField('name', StringType(), True),
StructField('mmsi', IntegerType(), True)
])
# Create data frame
json_file_path = "data.json"
df = spark.read.json(json_file_path, schema, multiLine=True)
print(df.schema)
print(df.head(3))
它打印:[Row(callsign=None, name=None, mmsi=None)]。
我做错了什么?我已经在系统设置中设置了我的环境变量。
您有 positionmessage
结构字段并在 schema
中丢失。
更改 架构 以包含结构字段,如下所示:
schema = StructType([StructField("positionmessage",StructType([StructField('callsign', StringType(), True),
StructField('name', StringType(), True),
StructField('mmsi', IntegerType(), True)
]))])
spark.read.schema(schema).json("<path>").\
select("positionmessage.*").\
show()
#+--------+----+----+
#|callsign|name|mmsi|
#+--------+----+----+
#| PPH1| 0.0| 100|
#| PPH2| 0.0| 200|
#| PPH3| 0.0| 300|
#+--------+----+----+
我尝试在 spark 数据帧中加载以下 data.json 文件:
{"positionmessage":{"callsign": "PPH1", "name": 0.0, "mmsi": 100}}
{"positionmessage":{"callsign": "PPH2", "name": 0.0, "mmsi": 200}}
{"positionmessage":{"callsign": "PPH3", "name": 0.0, "mmsi": 300}}
通过以下代码:
from pyspark.sql import SparkSession
from pyspark.sql.types import ArrayType, StructField, StructType, StringType, IntegerType
appName = "PySpark Example - JSON file to Spark Data Frame"
master = "local"
# Create Spark session
spark = SparkSession.builder \
.appName(appName) \
.master(master) \
.getOrCreate()
# Create a schema for the dataframe
schema = StructType([
StructField('callsign', StringType(), True),
StructField('name', StringType(), True),
StructField('mmsi', IntegerType(), True)
])
# Create data frame
json_file_path = "data.json"
df = spark.read.json(json_file_path, schema, multiLine=True)
print(df.schema)
print(df.head(3))
它打印:[Row(callsign=None, name=None, mmsi=None)]。 我做错了什么?我已经在系统设置中设置了我的环境变量。
您有 positionmessage
结构字段并在 schema
中丢失。
更改 架构 以包含结构字段,如下所示:
schema = StructType([StructField("positionmessage",StructType([StructField('callsign', StringType(), True),
StructField('name', StringType(), True),
StructField('mmsi', IntegerType(), True)
]))])
spark.read.schema(schema).json("<path>").\
select("positionmessage.*").\
show()
#+--------+----+----+
#|callsign|name|mmsi|
#+--------+----+----+
#| PPH1| 0.0| 100|
#| PPH2| 0.0| 200|
#| PPH3| 0.0| 300|
#+--------+----+----+