将 parent 模式列的一部分添加到嵌套 json 中的 child 中 spark 数据框
Adding part of the parent Schema column to child in nested json in spark data frame
我尝试将以下 xml 加载到 spark 数据框中。
<?xml version="1.0"?>
<env:ContentEnvelope xsi:schemaLocation="http">
<env:Header>
<env:Info>
<env:Id>urn:uuid:6d2af93bfbfc49da9805aebb6a38996d</env:Id>
<env:TimeStamp>20171122T07:56:09+00:00</env:TimeStamp>
</env:Info>
<fun:OrgId>18227</fun:OrgId>
<fun:DataPartitionId>1</fun:DataPartitionId>
</env:Header>
<env:Body minVers="0.0" majVers="1" contentSet="Fundamental">
<env:ContentItem action="Overwrite">
<env:Data xsi:type="sr:FinancialSourceDataItem">
<sr:Source sourceId="344" organizationId="4295906830">
<sr:FilingDateTime>20171111T17:00:00+00:00</sr:FilingDateTime>
<sr:SourceTypeCode>10K</sr:SourceTypeCode>
<sr:StatementDate>20171030T00:00:00+00:00</sr:StatementDate>
<sr:IsFilingDateTimeEstimated>false</sr:IsFilingDateTimeEstimated>
<sr:ContainsPreliminaryData>false</sr:ContainsPreliminaryData>
<sr:CapitalChangeAdjustmentDate>20171030T00:00:00+00:00</sr:CapitalChangeAdjustmentDate>
<sr:CumulativeAdjustmentFactor>1.00000</sr:CumulativeAdjustmentFactor>
<sr:ContainsRestatement>false</sr:ContainsRestatement>
<sr:FilingDateTimeUTCOffset>300</sr:FilingDateTimeUTCOffset>
<sr:ThirdPartySourceCode>SS</sr:ThirdPartySourceCode>
<sr:ThirdPartySourcePriority>1</sr:ThirdPartySourcePriority>
<sr:Auditors>
<sr:Auditor auditorId="3541">
<sr:AuditorOpinionCode>UNQ</sr:AuditorOpinionCode>
<sr:IsPlayingAuditorRole>true</sr:IsPlayingAuditorRole>
<sr:IsPlayingTaxAdvisorRole>false</sr:IsPlayingTaxAdvisorRole>
<sr:AuditorEnumerationId>3024068</sr:AuditorEnumerationId>
<sr:AuditorOpinionId>3010546</sr:AuditorOpinionId>
<sr:IsPlayingCSRAuditorRole>false</sr:IsPlayingCSRAuditorRole>
</sr:Auditor>
<sr:Auditor auditorId="9574">
<sr:AuditorOpinionCode>UWE</sr:AuditorOpinionCode>
<sr:IsPlayingAuditorRole>true</sr:IsPlayingAuditorRole>
<sr:IsPlayingTaxAdvisorRole>false</sr:IsPlayingTaxAdvisorRole>
<sr:AuditorEnumerationId>3030421</sr:AuditorEnumerationId>
<sr:AuditorOpinionId>3010547</sr:AuditorOpinionId>
<sr:IsPlayingCSRAuditorRole>false</sr:IsPlayingCSRAuditorRole>
</sr:Auditor>
</sr:Auditors>
<sr:SourceTypeId>3011835</sr:SourceTypeId>
<sr:ThirdPartySourceCodeId>1000716240</sr:ThirdPartySourceCodeId>
</sr:Source>
</env:Data>
</env:ContentItem>
</env:Body>
</env:ContentEnvelope>
主标签是<env:ContentEnvelope>
然后是两部分,一个是header(<env:Header>
)另一个是body(<env:Body
)
body 中 <fun:OrgId>
和 <fun:DataPartitionId>
中的详细信息对于 <env:Body
中的所有行都是相同的。
由此我想创建两个数据框。
一个用于 <sr:Source
,第二个用于 <sr:Auditor
对于两个数据框 action="Overwrite"
将与公共列相同。
还因为 <sr:Auditor
是 <sr:Source
的 child,所以像 sourceId="344" organizationId="4295906830"
这样的列很少会在 <sr:Auditor
数据框中重复。
这是我迄今为止为实现这一目标所做的工作
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val dfContentEnvelope = sqlContext.read.format("com.databricks.spark.xml").option("rowTag", "env:ContentEnvelope").load("s3://trfsmallfffile/XML")
val dfHeader = dfContentEnvelope.withColumn("Header", (dfContentEnvelope("env:Header"))).select("Header.*")
val dfDataPartitionId =dfHeader.select("fun:DataPartitionId")
//dfDataPartitionId.show()
//val dfBody = sqlContext.read.format("com.databricks.spark.xml").option("rowTag", "env:Body").load("s3://trfsmallfffile/XML")
val dfContentItem = dfContentEnvelope.withColumn("column1", explode(dfContentEnvelope("env:Body.env:ContentItem"))).select("column1.*")
val dfType=dfContentItem.select("env:Data.*")
//dfType.show()
val srSource = dfType.withColumn("srSource", (dfType("sr:Source"))).select("srSource.*").drop("sr:Auditors").filter($"srSource".isNotNull)
val srSourceAuditor = dfType.withColumn("srSource", explode(dfType("sr:Source.sr:Auditors.sr:Auditor"))).select("srSource.*")
所以我的问题是如何获得 <sr:Source
的 Parent 数据帧和 <sr:Auditor
的 child 数据帧以及从 Parent 到 child数据框?
如果您希望获得两个数据帧:一个用于 Source
,一个用于 Auditors
,organizationId
和 sourceId
of Source
数据帧,那么您可以使用以下逻辑。
观察给定的数据和你的尝试,我可以建议 env:Body.env:ContentItem
列上的 explode
函数会给你 parent dataframe
import sqlContext.implicits._
import org.apache.spark.sql.functions._
val dfContentEnvelope = sqlContext.read.format("com.databricks.spark.xml")
.option("rowTag", "env:ContentEnvelope")
.load("s3://trfsmallfffile/XML")
val dfContentItem = dfContentEnvelope.withColumn("column1", explode(dfContentEnvelope("env:Body.env:ContentItem"))).select("column1.*")
val ParentDF=dfContentItem.select($"env:Data.sr:Source._organizationId".as("organizationId"), $"env:Data.sr:Source._sourceId".as("sourceId"), $"env:Data.sr:Source".as("Source"))
这会给你
+--------------+--------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|organizationId|sourceId|Source |
+--------------+--------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|4295906830 |344 |[4295906830,344,[WrappedArray([3541,3024068,UNQ,3010546,true,false,false], [9574,3030421,UWE,3010547,true,false,false])],20171030T00:00:00+00:00,false,false,1.0,20171111T17:00:00+00:00,300,false,10K,3011835,20171030T00:00:00+00:00,SS,1000716240,1]|
+--------------+--------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
对于子数据帧,您需要将上面的父数据帧中的sr:Auditor
分解为
val childDF=ParentDF.select($"organizationId", $"sourceId", explode($"Source.sr:Auditors.sr:Auditor").as("Auditors"))
哪个应该给你
+--------------+--------+-------------------------------------------+
|organizationId|sourceId|Auditors |
+--------------+--------+-------------------------------------------+
|4295906830 |344 |[3541,3024068,UNQ,3010546,true,false,false]|
|4295906830 |344 |[9574,3030421,UWE,3010547,true,false,false]|
+--------------+--------+-------------------------------------------+
希望回答对你有帮助
我尝试将以下 xml 加载到 spark 数据框中。
<?xml version="1.0"?>
<env:ContentEnvelope xsi:schemaLocation="http">
<env:Header>
<env:Info>
<env:Id>urn:uuid:6d2af93bfbfc49da9805aebb6a38996d</env:Id>
<env:TimeStamp>20171122T07:56:09+00:00</env:TimeStamp>
</env:Info>
<fun:OrgId>18227</fun:OrgId>
<fun:DataPartitionId>1</fun:DataPartitionId>
</env:Header>
<env:Body minVers="0.0" majVers="1" contentSet="Fundamental">
<env:ContentItem action="Overwrite">
<env:Data xsi:type="sr:FinancialSourceDataItem">
<sr:Source sourceId="344" organizationId="4295906830">
<sr:FilingDateTime>20171111T17:00:00+00:00</sr:FilingDateTime>
<sr:SourceTypeCode>10K</sr:SourceTypeCode>
<sr:StatementDate>20171030T00:00:00+00:00</sr:StatementDate>
<sr:IsFilingDateTimeEstimated>false</sr:IsFilingDateTimeEstimated>
<sr:ContainsPreliminaryData>false</sr:ContainsPreliminaryData>
<sr:CapitalChangeAdjustmentDate>20171030T00:00:00+00:00</sr:CapitalChangeAdjustmentDate>
<sr:CumulativeAdjustmentFactor>1.00000</sr:CumulativeAdjustmentFactor>
<sr:ContainsRestatement>false</sr:ContainsRestatement>
<sr:FilingDateTimeUTCOffset>300</sr:FilingDateTimeUTCOffset>
<sr:ThirdPartySourceCode>SS</sr:ThirdPartySourceCode>
<sr:ThirdPartySourcePriority>1</sr:ThirdPartySourcePriority>
<sr:Auditors>
<sr:Auditor auditorId="3541">
<sr:AuditorOpinionCode>UNQ</sr:AuditorOpinionCode>
<sr:IsPlayingAuditorRole>true</sr:IsPlayingAuditorRole>
<sr:IsPlayingTaxAdvisorRole>false</sr:IsPlayingTaxAdvisorRole>
<sr:AuditorEnumerationId>3024068</sr:AuditorEnumerationId>
<sr:AuditorOpinionId>3010546</sr:AuditorOpinionId>
<sr:IsPlayingCSRAuditorRole>false</sr:IsPlayingCSRAuditorRole>
</sr:Auditor>
<sr:Auditor auditorId="9574">
<sr:AuditorOpinionCode>UWE</sr:AuditorOpinionCode>
<sr:IsPlayingAuditorRole>true</sr:IsPlayingAuditorRole>
<sr:IsPlayingTaxAdvisorRole>false</sr:IsPlayingTaxAdvisorRole>
<sr:AuditorEnumerationId>3030421</sr:AuditorEnumerationId>
<sr:AuditorOpinionId>3010547</sr:AuditorOpinionId>
<sr:IsPlayingCSRAuditorRole>false</sr:IsPlayingCSRAuditorRole>
</sr:Auditor>
</sr:Auditors>
<sr:SourceTypeId>3011835</sr:SourceTypeId>
<sr:ThirdPartySourceCodeId>1000716240</sr:ThirdPartySourceCodeId>
</sr:Source>
</env:Data>
</env:ContentItem>
</env:Body>
</env:ContentEnvelope>
主标签是<env:ContentEnvelope>
然后是两部分,一个是header(<env:Header>
)另一个是body(<env:Body
)
body 中 <fun:OrgId>
和 <fun:DataPartitionId>
中的详细信息对于 <env:Body
中的所有行都是相同的。
由此我想创建两个数据框。
一个用于 <sr:Source
,第二个用于 <sr:Auditor
对于两个数据框 action="Overwrite"
将与公共列相同。
还因为 <sr:Auditor
是 <sr:Source
的 child,所以像 sourceId="344" organizationId="4295906830"
这样的列很少会在 <sr:Auditor
数据框中重复。
这是我迄今为止为实现这一目标所做的工作
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val dfContentEnvelope = sqlContext.read.format("com.databricks.spark.xml").option("rowTag", "env:ContentEnvelope").load("s3://trfsmallfffile/XML")
val dfHeader = dfContentEnvelope.withColumn("Header", (dfContentEnvelope("env:Header"))).select("Header.*")
val dfDataPartitionId =dfHeader.select("fun:DataPartitionId")
//dfDataPartitionId.show()
//val dfBody = sqlContext.read.format("com.databricks.spark.xml").option("rowTag", "env:Body").load("s3://trfsmallfffile/XML")
val dfContentItem = dfContentEnvelope.withColumn("column1", explode(dfContentEnvelope("env:Body.env:ContentItem"))).select("column1.*")
val dfType=dfContentItem.select("env:Data.*")
//dfType.show()
val srSource = dfType.withColumn("srSource", (dfType("sr:Source"))).select("srSource.*").drop("sr:Auditors").filter($"srSource".isNotNull)
val srSourceAuditor = dfType.withColumn("srSource", explode(dfType("sr:Source.sr:Auditors.sr:Auditor"))).select("srSource.*")
所以我的问题是如何获得 <sr:Source
的 Parent 数据帧和 <sr:Auditor
的 child 数据帧以及从 Parent 到 child数据框?
如果您希望获得两个数据帧:一个用于 Source
,一个用于 Auditors
,organizationId
和 sourceId
of Source
数据帧,那么您可以使用以下逻辑。
观察给定的数据和你的尝试,我可以建议 env:Body.env:ContentItem
列上的 explode
函数会给你 parent dataframe
import sqlContext.implicits._
import org.apache.spark.sql.functions._
val dfContentEnvelope = sqlContext.read.format("com.databricks.spark.xml")
.option("rowTag", "env:ContentEnvelope")
.load("s3://trfsmallfffile/XML")
val dfContentItem = dfContentEnvelope.withColumn("column1", explode(dfContentEnvelope("env:Body.env:ContentItem"))).select("column1.*")
val ParentDF=dfContentItem.select($"env:Data.sr:Source._organizationId".as("organizationId"), $"env:Data.sr:Source._sourceId".as("sourceId"), $"env:Data.sr:Source".as("Source"))
这会给你
+--------------+--------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|organizationId|sourceId|Source |
+--------------+--------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|4295906830 |344 |[4295906830,344,[WrappedArray([3541,3024068,UNQ,3010546,true,false,false], [9574,3030421,UWE,3010547,true,false,false])],20171030T00:00:00+00:00,false,false,1.0,20171111T17:00:00+00:00,300,false,10K,3011835,20171030T00:00:00+00:00,SS,1000716240,1]|
+--------------+--------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
对于子数据帧,您需要将上面的父数据帧中的sr:Auditor
分解为
val childDF=ParentDF.select($"organizationId", $"sourceId", explode($"Source.sr:Auditors.sr:Auditor").as("Auditors"))
哪个应该给你
+--------------+--------+-------------------------------------------+
|organizationId|sourceId|Auditors |
+--------------+--------+-------------------------------------------+
|4295906830 |344 |[3541,3024068,UNQ,3010546,true,false,false]|
|4295906830 |344 |[9574,3030421,UWE,3010547,true,false,false]|
+--------------+--------+-------------------------------------------+
希望回答对你有帮助