在 Spark 的嵌套 XML 中为来自 parent 数据框的 child 数据框添加额外的列

Add extra column for child data frame from parent data frame in nested XML in Spark

我正在加载许多 XML 文件后创建数据。 每个 xml 文件都有一个唯一字段 fun:DataPartitionId 我正在从一个 XML 个文件创建多行。

现在我想为 XML 的结果行中的每一行添加此 fun:DataPartitionId

例如,假设第一个 XML 有 100 行,那么每 100 行将具有相同的 fun:DataPartitionId 字段。

所以 fun:DataPartitionId 在每个 XML 中作为 header 归档。

这就是我正在做的。

val sqlContext = new org.apache.spark.sql.SQLContext(sc)

    import sqlContext.implicits._
    import org.apache.spark.{ SparkConf, SparkContext }
    import java.sql.{Date, Timestamp}
    import org.apache.spark.sql.Row
    import org.apache.spark.sql.types._
    import org.apache.spark.sql.functions.udf


val getDataPartition =  udf { (DataPartition: String) => 
    if (DataPartition=="1") "SelfSourcedPublic"
    else  if (DataPartition=="2") "Japan"
    else  if (DataPartition=="3") "SelfSourcedPrivate"
    else "ThirdPartyPrivate"
}

val getFFActionParent =  udf { (FFAction: String) => 
    if (FFAction=="Insert") "I|!|"
    else if (FFAction=="Overwrite") "I|!|"
    else "D|!|" 
}

val getFFActionChild =  udf { (FFAction: String) => 
    if (FFAction=="Insert") "I|!|"
    else if (FFAction=="Overwrite") "O|!|"
    else "D|!|" 
}

val dfContentEnvelope = sqlContext.read.format("com.databricks.spark.xml").option("rowTag", "env:ContentEnvelope").load("s3://trfsmallfffile/XML")
val dfDataPartition=getDataPartition(dfContentEnvelope("env:Header.fun:DataPartitionId"))


val dfContentItem = dfContentEnvelope.withColumn("column1", explode(dfContentEnvelope("env:Body.env:ContentItem"))).select("column1.*")
val df =dfContentItem.withColumn("DataPartition",dfDataPartition)
df.show()

当您使用

阅读 xml 文件时
val dfContentEnvelope = sqlContext.read.format("com.databricks.spark.xml").option("rowTag", "env:ContentEnvelope").load("s3://trfsmallfffile/XML")

DataParitionId 列被读取为 Long

fun:DataPartitionId: long (nullable = true)

所以您应该将 udf 函数更改为

val getDataPartition =  udf { (DataPartition: Long) =>
  if (DataPartition== 1) "SelfSourcedPublic"
  else  if (DataPartition== 2) "Japan"
  else  if (DataPartition== 3) "SelfSourcedPrivate"
  else "ThirdPartyPrivate"
}

如果可能,您应该使用 when 函数而不是 udf 函数来提高处理速度和内存使用量

Now I want to add this fun:DataPartitionId for each row in the resulting rows from the xml .

你的错误是你忘记了select那个特定的列,所以下面的代码

val dfContentItem = dfContentEnvelope.withColumn("column1", explode(dfContentEnvelope("env:Body.env:ContentItem"))).select("column1.*")

应该是

val dfContentItem = dfContentEnvelope.withColumn("column1", explode(dfContentEnvelope("env:Body.env:ContentItem"))).select($"env:Header.fun:DataPartitionId".as("DataPartitionId"),$"column1.*")

然后你可以应用udf函数

val df = dfContentItem.select(getDataPartition($"DataPartitionId"), $"env:Data.sr:Source.*", $"_action".as("FFAction|!|"))

所以整个工作代码应该是

val sqlContext = new org.apache.spark.sql.SQLContext(sc)

    import sqlContext.implicits._
    import org.apache.spark.{ SparkConf, SparkContext }
    import java.sql.{Date, Timestamp}
    import org.apache.spark.sql.Row
    import org.apache.spark.sql.types._
    import org.apache.spark.sql.functions.udf


val getDataPartition =  udf { (DataPartition: Long) => 
    if (DataPartition=="1") "SelfSourcedPublic"
    else  if (DataPartition=="2") "Japan"
    else  if (DataPartition=="3") "SelfSourcedPrivate"
    else "ThirdPartyPrivate"
}

val dfContentEnvelope = sqlContext.read.format("com.databricks.spark.xml").option("rowTag", "env:ContentEnvelope").load("s3://trfsmallfffile/XML")

val dfContentItem = dfContentEnvelope.withColumn("column1", explode(dfContentEnvelope("env:Body.env:ContentItem"))).select($"env:Header.fun:DataPartitionId".as("DataPartitionId"),$"column1.*")
val df = dfContentItem.select(getDataPartition($"DataPartitionId"), $"env:Data.sr:Source.*", $"_action".as("FFAction|!|"))
df.show(false)

您可以继续执行其余代码。