从 Spark 写入镶木地板时如何处理空值
How to handle null values when writing to parquet from Spark
直到最近 parquet
不支持 null
值 - 一个有问题的前提。事实上,最近的版本终于添加了支持:
https://github.com/apache/parquet-format/blob/master/LogicalTypes.md
然而,spark
支持新的 parquet
功能还需要很长时间——如果有的话。这是关联的 (closed - will not fix
) JIRA:
https://issues.apache.org/jira/browse/SPARK-10943
那么,今天 将 dataframe
写成 parquet
时,人们对空列值做了什么?我只能想到 非常 丑陋可怕的黑客,比如写空字符串和..好吧..我没有知道如何处理数值指示 null
- 缺少一些标记值并让我的代码检查它(这很不方便且容易出错)。
您误解了 SPARK-10943。 Spark 确实支持将 null
值写入数字列。
问题是 null
本身根本不携带任何类型信息
scala> spark.sql("SELECT null as comments").printSchema
root
|-- comments: null (nullable = true)
根据comment by Michael Armbrust,你所要做的就是投射:
scala> spark.sql("""SELECT CAST(null as DOUBLE) AS comments""").printSchema
root
|-- comments: double (nullable = true)
结果可以安全地写入 Parquet。
我为此写了一个 PySpark 解决方案(df
是一个包含 NullType
列的数据框):
# get dataframe schema
my_schema = list(df.schema)
null_cols = []
# iterate over schema list to filter for NullType columns
for st in my_schema:
if str(st.dataType) == 'NullType':
null_cols.append(st)
# cast null type columns to string (or whatever you'd like)
for ncol in null_cols:
mycolname = str(ncol.name)
df = df \
.withColumn(mycolname, df[mycolname].cast('string'))
直到最近 parquet
不支持 null
值 - 一个有问题的前提。事实上,最近的版本终于添加了支持:
https://github.com/apache/parquet-format/blob/master/LogicalTypes.md
然而,spark
支持新的 parquet
功能还需要很长时间——如果有的话。这是关联的 (closed - will not fix
) JIRA:
https://issues.apache.org/jira/browse/SPARK-10943
那么,今天 将 dataframe
写成 parquet
时,人们对空列值做了什么?我只能想到 非常 丑陋可怕的黑客,比如写空字符串和..好吧..我没有知道如何处理数值指示 null
- 缺少一些标记值并让我的代码检查它(这很不方便且容易出错)。
您误解了 SPARK-10943。 Spark 确实支持将 null
值写入数字列。
问题是 null
本身根本不携带任何类型信息
scala> spark.sql("SELECT null as comments").printSchema
root
|-- comments: null (nullable = true)
根据comment by Michael Armbrust,你所要做的就是投射:
scala> spark.sql("""SELECT CAST(null as DOUBLE) AS comments""").printSchema
root
|-- comments: double (nullable = true)
结果可以安全地写入 Parquet。
我为此写了一个 PySpark 解决方案(df
是一个包含 NullType
列的数据框):
# get dataframe schema
my_schema = list(df.schema)
null_cols = []
# iterate over schema list to filter for NullType columns
for st in my_schema:
if str(st.dataType) == 'NullType':
null_cols.append(st)
# cast null type columns to string (or whatever you'd like)
for ncol in null_cols:
mycolname = str(ncol.name)
df = df \
.withColumn(mycolname, df[mycolname].cast('string'))