如何使用 PySpark 展平嵌套结构?
How to flatten nested struct using PySpark?
如何使用 PySpark 展平嵌套结构?
Link 到数据集
https://drive.google.com/file/d/1-xOpd2B7MDgS1t4ekfipHSerIm6JMz9e/view?usp=sharing
提前致谢。
虽然我同意 Phantoms 的观点,但如果您仍然没有弄明白,那么展平 df 仍然是非常基本的,您可以使用下面的函数展平您的 df
def flattenNestedData(nestedDF):
from pyspark.sql.functions import col
from pyspark.sql.types import StructType,ArrayType
try:
##Fetching Complex Datatype Columns from Schema
fieldNames = dict([(field.name, field.dataType) for field in nestedDF.schema.fields if type(field.dataType) == ArrayType or type(field.dataType) == StructType])
while len(fieldNames)!=0:
fieldName=list(fieldNames.keys())[0]
print ("Processing :"+fieldName+" Type : "+str(type(fieldNames[fieldName])))
if type(fieldNames[fieldName]) == StructType:
extractedFields = [col(fieldName +'.'+ innerColName).alias(fieldName+"_"+innerColName) for innerColName in [ colName.name for colName in fieldNames[fieldName]]]
nestedDF=nestedDF.select("*", *extractedFields).drop(fieldName)
elif type(fieldNames[fieldName]) == ArrayType:
nestedDF=nestedDF.withColumn(fieldName,explode_outer(fieldName))
fieldNames = dict([(field.name, field.dataType) for field in nestedDF.schema.fields if type(field.dataType) == ArrayType or type(field.dataType) == StructType])
return nestedDF
except Exception as err:
raise Exception("Error Occured at while flattening the dataframe : " + str(err))
如果你不想分解那些,你可以删除 Arraytype 检查
如何使用 PySpark 展平嵌套结构?
Link 到数据集 https://drive.google.com/file/d/1-xOpd2B7MDgS1t4ekfipHSerIm6JMz9e/view?usp=sharing
虽然我同意 Phantoms 的观点,但如果您仍然没有弄明白,那么展平 df 仍然是非常基本的,您可以使用下面的函数展平您的 df
def flattenNestedData(nestedDF):
from pyspark.sql.functions import col
from pyspark.sql.types import StructType,ArrayType
try:
##Fetching Complex Datatype Columns from Schema
fieldNames = dict([(field.name, field.dataType) for field in nestedDF.schema.fields if type(field.dataType) == ArrayType or type(field.dataType) == StructType])
while len(fieldNames)!=0:
fieldName=list(fieldNames.keys())[0]
print ("Processing :"+fieldName+" Type : "+str(type(fieldNames[fieldName])))
if type(fieldNames[fieldName]) == StructType:
extractedFields = [col(fieldName +'.'+ innerColName).alias(fieldName+"_"+innerColName) for innerColName in [ colName.name for colName in fieldNames[fieldName]]]
nestedDF=nestedDF.select("*", *extractedFields).drop(fieldName)
elif type(fieldNames[fieldName]) == ArrayType:
nestedDF=nestedDF.withColumn(fieldName,explode_outer(fieldName))
fieldNames = dict([(field.name, field.dataType) for field in nestedDF.schema.fields if type(field.dataType) == ArrayType or type(field.dataType) == StructType])
return nestedDF
except Exception as err:
raise Exception("Error Occured at while flattening the dataframe : " + str(err))
如果你不想分解那些,你可以删除 Arraytype 检查