Spark:检索嵌套结构列的数据类型
Spark: retrieve datatype of nested Struct column
我目前正在从事一项工作,将嵌套的 Json 文件加载为数据框,对其执行一些转换,然后将其加载到增量 table 中。我使用的测试数据有很多嵌套列,但是作业将来加载的 json 文件可能不会一直包含所有列(或者它们具有不同的数据类型)。因此,我想首先检查一列是否存在以及它具有哪种数据类型。问题:我没有让它工作,因为我不知道如何从数据框的嵌套模式中导出列的数据类型。
示例:如何获取ecuId的数据类型?
到目前为止我的方法是:
df.withColumn("datatype", isinstance(col("reportData.ecus.element.ecuId"), (float, int, str, list, dict, tuple)))
或
df.withColumn("datatype", isinstance(jsonDF.reportData.ecus.element.ecuId, (float, int, str, list, dict, tuple)))
对于这两个版本,我都收到错误消息:“col should be Column”
即使我尝试非常基本的
df.withColumn("datatype", type(jsonDF.reportData.ecus.element.ecuId))
我得到了同样的错误。
看来我对如何使用嵌套结构完全误解了?
你能解释一下我是如何获得数据类型的吗?非常感谢!
您收到错误 col should be Column
的原因是因为 withColumn
期望第二个参数是 Column
对象,而不是纯 Python 对象。
我得到的最接近的方法有点“hacky”,通过手动解析数据帧的模式。
from pyspark.sql import functions as F
(df
.withColumn('schema', F.lit(df.dtypes[0][1]))
.withColumn('datatype', F.regexp_extract('schema', 'ecus.*.ecuId:([^>]*)', 1))
.show(10, False)
)
# Output
# +----------+----------------------------------------+--------+
# |reportData|schema |datatype|
# +----------+----------------------------------------+--------+
# |{[{1000}]}|struct<ecus:array<struct<ecuId:bigint>>>|bigint |
# +----------+----------------------------------------+--------+
我目前正在从事一项工作,将嵌套的 Json 文件加载为数据框,对其执行一些转换,然后将其加载到增量 table 中。我使用的测试数据有很多嵌套列,但是作业将来加载的 json 文件可能不会一直包含所有列(或者它们具有不同的数据类型)。因此,我想首先检查一列是否存在以及它具有哪种数据类型。问题:我没有让它工作,因为我不知道如何从数据框的嵌套模式中导出列的数据类型。
示例:如何获取ecuId的数据类型?
到目前为止我的方法是:
df.withColumn("datatype", isinstance(col("reportData.ecus.element.ecuId"), (float, int, str, list, dict, tuple)))
或
df.withColumn("datatype", isinstance(jsonDF.reportData.ecus.element.ecuId, (float, int, str, list, dict, tuple)))
对于这两个版本,我都收到错误消息:“col should be Column” 即使我尝试非常基本的
df.withColumn("datatype", type(jsonDF.reportData.ecus.element.ecuId))
我得到了同样的错误。 看来我对如何使用嵌套结构完全误解了? 你能解释一下我是如何获得数据类型的吗?非常感谢!
您收到错误 col should be Column
的原因是因为 withColumn
期望第二个参数是 Column
对象,而不是纯 Python 对象。
我得到的最接近的方法有点“hacky”,通过手动解析数据帧的模式。
from pyspark.sql import functions as F
(df
.withColumn('schema', F.lit(df.dtypes[0][1]))
.withColumn('datatype', F.regexp_extract('schema', 'ecus.*.ecuId:([^>]*)', 1))
.show(10, False)
)
# Output
# +----------+----------------------------------------+--------+
# |reportData|schema |datatype|
# +----------+----------------------------------------+--------+
# |{[{1000}]}|struct<ecus:array<struct<ecuId:bigint>>>|bigint |
# +----------+----------------------------------------+--------+