解析 xml 文件时,由于 spark 中的类型不匹配,无法解决爆炸问题
cannot resolve explode due to type mismatch in spark while parsing xml file
我有一个具有以下架构的数据框
root
|-- DataPartition: long (nullable = true)
|-- TimeStamp: string (nullable = true)
|-- _organizationId: long (nullable = true)
|-- _segmentId: long (nullable = true)
|-- seg:BusinessSegments: struct (nullable = true)
| |-- seg:BusinessSegment: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- _VALUE: string (nullable = true)
| | | |-- _hierarchicalCode: long (nullable = true)
| | | |-- _industryId: long (nullable = true)
| | | |-- _ranking: long (nullable = true)
|-- seg:GeographicSegments: struct (nullable = true)
| |-- seg:GeographicSegment: struct (nullable = true)
| | |-- _geographyId: long (nullable = true)
| | |-- seg:IsSubtracted: boolean (nullable = true)
| | |-- seg:Sequence: long (nullable = true)
|-- seg:IsCorporate: boolean (nullable = true)
|-- seg:IsElimination: boolean (nullable = true)
|-- seg:IsOperatingSegment: boolean (nullable = true)
|-- seg:IsOther: boolean (nullable = true)
|-- seg:IsShariaCompliant: boolean (nullable = true)
|-- seg:PredecessorSegments: struct (nullable = true)
| |-- seg:PredecessorSegment: long (nullable = true)
|-- seg:SegmentLocalLanguageLabel: struct (nullable = true)
| |-- _VALUE: string (nullable = true)
| |-- _languageId: long (nullable = true)
|-- seg:SegmentName: struct (nullable = true)
| |-- _VALUE: string (nullable = true)
| |-- _languageId: long (nullable = true)
|-- seg:SegmentType: string (nullable = true)
|-- seg:SegmentTypeId: long (nullable = true)
|-- seg:ValidFromPeriodEndDate: string (nullable = true)
|-- _action: string (nullable = true)
现在我想从架构中获取 seg:BusinessSegments.seg:BusinessSegment
值。
但我的问题是当我使用 explode 执行此操作时
val GeographicSegmentchildDF = parentDF.select($"DataPartition".as("DataPartition"), $"TimeStamp".as("TimeStamp"), $"_organizationId", $"_segmentId", explode($"seg:GeographicSegments.seg:GeographicSegment").as("GeographicSegments"), $"_action")
val GeographicSegmentchildArrayDF = GeographicSegmentchildDF.select(getDataPartition($"DataPartition").as("DataPartition"), $"TimeStamp".as("TimeStamp"), $"_organizationId".as("OrganizationId"), $"_segmentId".as("SegmentId"), $"GeographicSegments.*", getFFActionChild($"_action").as("FFAction|!|"))
所以在第一行我正在爆炸,在下一行我正在做 * 或扩展 $"GeographicSegments.*",
。
我收到类似这样的错误
这就是我正在做的
Exception in thread "main" org.apache.spark.sql.AnalysisException:
cannot resolve
'explode(seg:GeographicSegments
.seg:GeographicSegment
)' due to
data type mismatch:
我知道这个问题,因为在模式中我得到 seg:GeographicSegment
作为结构而不是数组,这就是为什么我得到 .
所以真正的问题是我没有固定的架构。
当 xml 文件中有两条记录时,seg:GeographicSegment
变成数组,然后我的代码工作正常,但是当我只得到一条记录时,它作为结构工作我的代码失败了。
我如何在我的代码中处理这个问题。
解析模式时是否必须设置条件?
或者有没有我
这是其中一个不起作用的案例
val columnTypePredecessorSegments = parentDF.select($"seg:PredecessorSegments.seg:PredecessorSegment").schema.map(_.dataType).head.toString().startsWith("LongType")
//if column type is struct then use .* and array function to convert the struct to array else just use array
val PredecessorSegmentschildDF = if (columnTypePredecessorSegments) {
parentDF.select($"DataPartition".as("DataPartition"), $"TimeStamp".as("TimeStamp"), $"_organizationId", $"_segmentId", explode(array($"seg:PredecessorSegments.seg:PredecessorSegment")).as("PredecessorSegments"), $"_action")
} else {
parentDF.select($"DataPartition".as("DataPartition"), $"TimeStamp".as("TimeStamp"), $"_organizationId", $"_segmentId", explode($"seg:PredecessorSegments.seg:PredecessorSegment").as("PredecessorSegments"), $"_action")
}
val PredecessorSegmentsDFFinalChilddDF = PredecessorSegmentschildDF.select(getDataPartition($"DataPartition").as("DataPartition"), $"TimeStamp".as("TimeStamp"), $"_organizationId".as("OrganizationId"), $"_segmentId".as("SuccessorSegment"), $"PredecessorSegments.*", getFFActionChild($"_action").as("FFAction|!|"))
PredecessorSegmentsDFFinalChilddDF.show(false)
When there are two records in xml file then seg:GeographicSegment becomes as array and then my code is working fine but when I get only one record then it work as struct and my code fails .
那么在使用 explode
之前,您需要检查列的数据类型
//checking for struct or array type in that column
val columnType = parentDF.select($"seg:GeographicSegments.seg:GeographicSegment").schema.map(_.dataType).head.toString().startsWith("StructType")
import org.apache.spark.sql.functions._
//if column type is struct then use .* and array function to convert the struct to array else just use array
val GeographicSegmentchildDF = if(columnType) {
parentDF.select($"DataPartition".as("DataPartition"), $"TimeStamp".as("TimeStamp"), $"_organizationId", $"_segmentId", explode(array($"seg:GeographicSegments.seg:GeographicSegment.*")).as("GeographicSegments"), $"_action")
}
else {
parentDF.select($"DataPartition".as("DataPartition"), $"TimeStamp".as("TimeStamp"), $"_organizationId", $"_segmentId", explode($"seg:GeographicSegments.seg:GeographicSegment").as("GeographicSegments"), $"_action")
}
val GeographicSegmentchildArrayDF = GeographicSegmentchildDF.select(getDataPartition($"DataPartition").as("DataPartition"), $"TimeStamp".as("TimeStamp"), $"_organizationId".as("OrganizationId"), $"_segmentId".as("SegmentId"), $"GeographicSegments.*", getFFActionChild($"_action").as("FFAction|!|"))
希望回答对你有帮助
我有一个具有以下架构的数据框
root
|-- DataPartition: long (nullable = true)
|-- TimeStamp: string (nullable = true)
|-- _organizationId: long (nullable = true)
|-- _segmentId: long (nullable = true)
|-- seg:BusinessSegments: struct (nullable = true)
| |-- seg:BusinessSegment: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- _VALUE: string (nullable = true)
| | | |-- _hierarchicalCode: long (nullable = true)
| | | |-- _industryId: long (nullable = true)
| | | |-- _ranking: long (nullable = true)
|-- seg:GeographicSegments: struct (nullable = true)
| |-- seg:GeographicSegment: struct (nullable = true)
| | |-- _geographyId: long (nullable = true)
| | |-- seg:IsSubtracted: boolean (nullable = true)
| | |-- seg:Sequence: long (nullable = true)
|-- seg:IsCorporate: boolean (nullable = true)
|-- seg:IsElimination: boolean (nullable = true)
|-- seg:IsOperatingSegment: boolean (nullable = true)
|-- seg:IsOther: boolean (nullable = true)
|-- seg:IsShariaCompliant: boolean (nullable = true)
|-- seg:PredecessorSegments: struct (nullable = true)
| |-- seg:PredecessorSegment: long (nullable = true)
|-- seg:SegmentLocalLanguageLabel: struct (nullable = true)
| |-- _VALUE: string (nullable = true)
| |-- _languageId: long (nullable = true)
|-- seg:SegmentName: struct (nullable = true)
| |-- _VALUE: string (nullable = true)
| |-- _languageId: long (nullable = true)
|-- seg:SegmentType: string (nullable = true)
|-- seg:SegmentTypeId: long (nullable = true)
|-- seg:ValidFromPeriodEndDate: string (nullable = true)
|-- _action: string (nullable = true)
现在我想从架构中获取 seg:BusinessSegments.seg:BusinessSegment
值。
但我的问题是当我使用 explode 执行此操作时
val GeographicSegmentchildDF = parentDF.select($"DataPartition".as("DataPartition"), $"TimeStamp".as("TimeStamp"), $"_organizationId", $"_segmentId", explode($"seg:GeographicSegments.seg:GeographicSegment").as("GeographicSegments"), $"_action")
val GeographicSegmentchildArrayDF = GeographicSegmentchildDF.select(getDataPartition($"DataPartition").as("DataPartition"), $"TimeStamp".as("TimeStamp"), $"_organizationId".as("OrganizationId"), $"_segmentId".as("SegmentId"), $"GeographicSegments.*", getFFActionChild($"_action").as("FFAction|!|"))
所以在第一行我正在爆炸,在下一行我正在做 * 或扩展 $"GeographicSegments.*",
。
我收到类似这样的错误 这就是我正在做的
Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve 'explode(
seg:GeographicSegments
.seg:GeographicSegment
)' due to data type mismatch:
我知道这个问题,因为在模式中我得到 seg:GeographicSegment
作为结构而不是数组,这就是为什么我得到 .
所以真正的问题是我没有固定的架构。
当 xml 文件中有两条记录时,seg:GeographicSegment
变成数组,然后我的代码工作正常,但是当我只得到一条记录时,它作为结构工作我的代码失败了。
我如何在我的代码中处理这个问题。 解析模式时是否必须设置条件? 或者有没有我
这是其中一个不起作用的案例
val columnTypePredecessorSegments = parentDF.select($"seg:PredecessorSegments.seg:PredecessorSegment").schema.map(_.dataType).head.toString().startsWith("LongType")
//if column type is struct then use .* and array function to convert the struct to array else just use array
val PredecessorSegmentschildDF = if (columnTypePredecessorSegments) {
parentDF.select($"DataPartition".as("DataPartition"), $"TimeStamp".as("TimeStamp"), $"_organizationId", $"_segmentId", explode(array($"seg:PredecessorSegments.seg:PredecessorSegment")).as("PredecessorSegments"), $"_action")
} else {
parentDF.select($"DataPartition".as("DataPartition"), $"TimeStamp".as("TimeStamp"), $"_organizationId", $"_segmentId", explode($"seg:PredecessorSegments.seg:PredecessorSegment").as("PredecessorSegments"), $"_action")
}
val PredecessorSegmentsDFFinalChilddDF = PredecessorSegmentschildDF.select(getDataPartition($"DataPartition").as("DataPartition"), $"TimeStamp".as("TimeStamp"), $"_organizationId".as("OrganizationId"), $"_segmentId".as("SuccessorSegment"), $"PredecessorSegments.*", getFFActionChild($"_action").as("FFAction|!|"))
PredecessorSegmentsDFFinalChilddDF.show(false)
When there are two records in xml file then seg:GeographicSegment becomes as array and then my code is working fine but when I get only one record then it work as struct and my code fails .
那么在使用 explode
之前,您需要检查列的数据类型//checking for struct or array type in that column
val columnType = parentDF.select($"seg:GeographicSegments.seg:GeographicSegment").schema.map(_.dataType).head.toString().startsWith("StructType")
import org.apache.spark.sql.functions._
//if column type is struct then use .* and array function to convert the struct to array else just use array
val GeographicSegmentchildDF = if(columnType) {
parentDF.select($"DataPartition".as("DataPartition"), $"TimeStamp".as("TimeStamp"), $"_organizationId", $"_segmentId", explode(array($"seg:GeographicSegments.seg:GeographicSegment.*")).as("GeographicSegments"), $"_action")
}
else {
parentDF.select($"DataPartition".as("DataPartition"), $"TimeStamp".as("TimeStamp"), $"_organizationId", $"_segmentId", explode($"seg:GeographicSegments.seg:GeographicSegment").as("GeographicSegments"), $"_action")
}
val GeographicSegmentchildArrayDF = GeographicSegmentchildDF.select(getDataPartition($"DataPartition").as("DataPartition"), $"TimeStamp".as("TimeStamp"), $"_organizationId".as("OrganizationId"), $"_segmentId".as("SegmentId"), $"GeographicSegments.*", getFFActionChild($"_action").as("FFAction|!|"))
希望回答对你有帮助