当该行的一列值为 NULL 时,Spark Dataframe returns 整行为 NULL

Spark Dataframe returns NULL for entire row when one column value of that row is NULL

输入数据-

{"driverId":1,"driverRef":"hamilton","number":44,"code":"HAM","name":{"forename":"Lewis","surname":"Hamilton"},"dob":"1985-01-07","nationality":"British","url":"http://en.wikipedia.org/wiki/Lewis_Hamilton"}
{"driverId":2,"driverRef":"heidfeld","number":"\N","code":"HEI","name":{"forename":"Nick","surname":"Heidfeld"},"dob":"1977-05-10","nationality":"German","url":"http://en.wikipedia.org/wiki/Nick_Heidfeld"}
{"driverId":3,"driverRef":"rosberg","number":6,"code":"ROS","name":{"forename":"Nico","surname":"Rosberg"},"dob":"1985-06-27","nationality":"German","url":"http://en.wikipedia.org/wiki/Nico_Rosberg"}
{"driverId":4,"driverRef":"alonso","number":14,"code":"ALO","name":{"forename":"Fernando","surname":"Alonso"},"dob":"1981-07-29","nationality":"Spanish","url":"http://en.wikipedia.org/wiki/Fernando_Alonso"}
{"driverId":5,"driverRef":"kovalainen","number":"\N","code":"KOV","name":{"forename":"Heikki","surname":"Kovalainen"},"dob":"1981-10-19","nationality":"Finnish","url":"http://en.wikipedia.org/wiki/Heikki_Kovalainen"}
{"driverId":6,"driverRef":"nakajima","number":"\N","code":"NAK","name":{"forename":"Kazuki","surname":"Nakajima"},"dob":"1985-01-11","nationality":"Japanese","url":"http://en.wikipedia.org/wiki/Kazuki_Nakajima"}
{"driverId":7,"driverRef":"bourdais","number":"\N","code":"BOU","name":{"forename":"Sébastien","surname":"Bourdais"},"dob":"1979-02-28","nationality":"French","url":"http://en.wikipedia.org/wiki/S%C3%A9bastien_Bourdais"}

在显示 df 的同时将此数据读入 spark 数据帧后,我可以看到 driverId 2、5、6、7 的整行都是 NULL。我可以看到该驱动程序 ID 的列号值为 NULL。

这是我的代码。这里有什么错误吗?

from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType

name_field = StructType(fields =[
  StructField("forename", StringType(), True),
  StructField("surname", StringType(), True)
])

driver_schema = StructType(fields =[
  StructField("driverId", IntegerType(), False),
  StructField("driverRef", StringType(), True),
  StructField("number", IntegerType(), True),
  StructField("code", StringType(), True),
  StructField("name", name_field),
  StructField("dob", DateType(), True),
  StructField("nationality", StringType(),True),
  StructField("url", StringType(), True)
])
 
driver_df = spark.read\
.schema(driver_schema)\
.json('dbfs:/mnt/databrickslearnf1azure/raw/drivers.json')

driver_df.printSchema()
root
 |-- driverId: integer (nullable = true)
 |-- driverRef: string (nullable = true)
 |-- number: integer (nullable = true)
 |-- code: string (nullable = true)
 |-- name: struct (nullable = true)
 |    |-- forename: string (nullable = true)
 |    |-- surname: string (nullable = true)
 |-- dob: date (nullable = true)
 |-- nationality: string (nullable = true)
 |-- url: string (nullable = true)

display(driver_df)

你看到这个是因为,根据官方数据块文档:Cause

Spark 3.0 and above (Databricks Runtime 7.3 LTS and above) cannot parse JSON arrays as structs. You should pass the schema as ArrayType instead of StructType.

解决方案:将架构作为 ArrayType 而不是 StructType 传递。

driver_schema = ArrayType(StructType(fields =[
  StructField("driverId", IntegerType(), False),
  StructField("driverRef", StringType(), True),
  StructField("number", IntegerType(), True),
  StructField("code", StringType(), True),
  StructField("name", name_field),
  StructField("dob", DateType(), True),
  StructField("nationality", StringType(),True),
  StructField("url", StringType(), True)
]))

您可以将初始架构更改为如下所示,假设数字为字符串类型。

from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType

name_field = StructType(fields =[
  StructField("forename", StringType(), True),
  StructField("surname", StringType(), True)
])

driver_schema = StructType(fields =[
  StructField("driverId", IntegerType(), False),
  StructField("driverRef", StringType(), True),
  StructField("number", StringType(), True),
  StructField("code", StringType(), True),
  StructField("name", name_field),
  StructField("dob", DateType(), True),
  StructField("nationality", StringType(),True),
  StructField("url", StringType(), True)
])

然后您可以使用您正在使用的相同代码从 json 文件中读取数据,如下所示:

driver_df = spark.read\
.schema(driver_schema)\
.json('dbfs:/mnt/databrickslearnf1azure/raw/drivers.json')

driver_df.printSchema()

读取数据后,您可以应用逻辑将“\N”转换为 null,然后将列的数据类型从字符串更改为整数,如下所示:

from pyspark.sql.functions import *
df = driver_df.withColumn("number", when(driver_df.number=="\N","null").otherwise(driver_df.number))
finaldf = df.withColumn("number",df.number.cast(IntegerType()))
finaldf.printSchema()

现在,如果您在数据框上进行显示或显示,您可以看到如下输出: