在 Spark 的嵌套 XML 中为来自 parent 数据框的 child 数据框添加额外的列
Add extra column for child data frame from parent data frame in nested XML in Spark
我正在加载许多 XML 文件后创建数据。
每个 xml 文件都有一个唯一字段 fun:DataPartitionId
我正在从一个 XML 个文件创建多行。
现在我想为 XML 的结果行中的每一行添加此 fun:DataPartitionId
。
例如,假设第一个 XML 有 100 行,那么每 100 行将具有相同的 fun:DataPartitionId
字段。
所以 fun:DataPartitionId
在每个 XML 中作为 header 归档。
这就是我正在做的。
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
import org.apache.spark.{ SparkConf, SparkContext }
import java.sql.{Date, Timestamp}
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions.udf
val getDataPartition = udf { (DataPartition: String) =>
if (DataPartition=="1") "SelfSourcedPublic"
else if (DataPartition=="2") "Japan"
else if (DataPartition=="3") "SelfSourcedPrivate"
else "ThirdPartyPrivate"
}
val getFFActionParent = udf { (FFAction: String) =>
if (FFAction=="Insert") "I|!|"
else if (FFAction=="Overwrite") "I|!|"
else "D|!|"
}
val getFFActionChild = udf { (FFAction: String) =>
if (FFAction=="Insert") "I|!|"
else if (FFAction=="Overwrite") "O|!|"
else "D|!|"
}
val dfContentEnvelope = sqlContext.read.format("com.databricks.spark.xml").option("rowTag", "env:ContentEnvelope").load("s3://trfsmallfffile/XML")
val dfDataPartition=getDataPartition(dfContentEnvelope("env:Header.fun:DataPartitionId"))
val dfContentItem = dfContentEnvelope.withColumn("column1", explode(dfContentEnvelope("env:Body.env:ContentItem"))).select("column1.*")
val df =dfContentItem.withColumn("DataPartition",dfDataPartition)
df.show()
当您使用
阅读 xml
文件时
val dfContentEnvelope = sqlContext.read.format("com.databricks.spark.xml").option("rowTag", "env:ContentEnvelope").load("s3://trfsmallfffile/XML")
DataParitionId
列被读取为 Long
fun:DataPartitionId: long (nullable = true)
所以您应该将 udf
函数更改为
val getDataPartition = udf { (DataPartition: Long) =>
if (DataPartition== 1) "SelfSourcedPublic"
else if (DataPartition== 2) "Japan"
else if (DataPartition== 3) "SelfSourcedPrivate"
else "ThirdPartyPrivate"
}
如果可能,您应该使用 when 函数而不是 udf 函数来提高处理速度和内存使用量
Now I want to add this fun:DataPartitionId for each row in the resulting rows from the xml .
你的错误是你忘记了select
那个特定的列,所以下面的代码
val dfContentItem = dfContentEnvelope.withColumn("column1", explode(dfContentEnvelope("env:Body.env:ContentItem"))).select("column1.*")
应该是
val dfContentItem = dfContentEnvelope.withColumn("column1", explode(dfContentEnvelope("env:Body.env:ContentItem"))).select($"env:Header.fun:DataPartitionId".as("DataPartitionId"),$"column1.*")
然后你可以应用udf
函数
val df = dfContentItem.select(getDataPartition($"DataPartitionId"), $"env:Data.sr:Source.*", $"_action".as("FFAction|!|"))
所以整个工作代码应该是
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
import org.apache.spark.{ SparkConf, SparkContext }
import java.sql.{Date, Timestamp}
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions.udf
val getDataPartition = udf { (DataPartition: Long) =>
if (DataPartition=="1") "SelfSourcedPublic"
else if (DataPartition=="2") "Japan"
else if (DataPartition=="3") "SelfSourcedPrivate"
else "ThirdPartyPrivate"
}
val dfContentEnvelope = sqlContext.read.format("com.databricks.spark.xml").option("rowTag", "env:ContentEnvelope").load("s3://trfsmallfffile/XML")
val dfContentItem = dfContentEnvelope.withColumn("column1", explode(dfContentEnvelope("env:Body.env:ContentItem"))).select($"env:Header.fun:DataPartitionId".as("DataPartitionId"),$"column1.*")
val df = dfContentItem.select(getDataPartition($"DataPartitionId"), $"env:Data.sr:Source.*", $"_action".as("FFAction|!|"))
df.show(false)
您可以继续执行其余代码。
我正在加载许多 XML 文件后创建数据。
每个 xml 文件都有一个唯一字段 fun:DataPartitionId
我正在从一个 XML 个文件创建多行。
现在我想为 XML 的结果行中的每一行添加此 fun:DataPartitionId
。
例如,假设第一个 XML 有 100 行,那么每 100 行将具有相同的 fun:DataPartitionId
字段。
所以 fun:DataPartitionId
在每个 XML 中作为 header 归档。
这就是我正在做的。
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
import org.apache.spark.{ SparkConf, SparkContext }
import java.sql.{Date, Timestamp}
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions.udf
val getDataPartition = udf { (DataPartition: String) =>
if (DataPartition=="1") "SelfSourcedPublic"
else if (DataPartition=="2") "Japan"
else if (DataPartition=="3") "SelfSourcedPrivate"
else "ThirdPartyPrivate"
}
val getFFActionParent = udf { (FFAction: String) =>
if (FFAction=="Insert") "I|!|"
else if (FFAction=="Overwrite") "I|!|"
else "D|!|"
}
val getFFActionChild = udf { (FFAction: String) =>
if (FFAction=="Insert") "I|!|"
else if (FFAction=="Overwrite") "O|!|"
else "D|!|"
}
val dfContentEnvelope = sqlContext.read.format("com.databricks.spark.xml").option("rowTag", "env:ContentEnvelope").load("s3://trfsmallfffile/XML")
val dfDataPartition=getDataPartition(dfContentEnvelope("env:Header.fun:DataPartitionId"))
val dfContentItem = dfContentEnvelope.withColumn("column1", explode(dfContentEnvelope("env:Body.env:ContentItem"))).select("column1.*")
val df =dfContentItem.withColumn("DataPartition",dfDataPartition)
df.show()
当您使用
阅读xml
文件时
val dfContentEnvelope = sqlContext.read.format("com.databricks.spark.xml").option("rowTag", "env:ContentEnvelope").load("s3://trfsmallfffile/XML")
DataParitionId
列被读取为 Long
fun:DataPartitionId: long (nullable = true)
所以您应该将 udf
函数更改为
val getDataPartition = udf { (DataPartition: Long) =>
if (DataPartition== 1) "SelfSourcedPublic"
else if (DataPartition== 2) "Japan"
else if (DataPartition== 3) "SelfSourcedPrivate"
else "ThirdPartyPrivate"
}
如果可能,您应该使用 when 函数而不是 udf 函数来提高处理速度和内存使用量
Now I want to add this fun:DataPartitionId for each row in the resulting rows from the xml .
你的错误是你忘记了select
那个特定的列,所以下面的代码
val dfContentItem = dfContentEnvelope.withColumn("column1", explode(dfContentEnvelope("env:Body.env:ContentItem"))).select("column1.*")
应该是
val dfContentItem = dfContentEnvelope.withColumn("column1", explode(dfContentEnvelope("env:Body.env:ContentItem"))).select($"env:Header.fun:DataPartitionId".as("DataPartitionId"),$"column1.*")
然后你可以应用udf
函数
val df = dfContentItem.select(getDataPartition($"DataPartitionId"), $"env:Data.sr:Source.*", $"_action".as("FFAction|!|"))
所以整个工作代码应该是
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
import org.apache.spark.{ SparkConf, SparkContext }
import java.sql.{Date, Timestamp}
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions.udf
val getDataPartition = udf { (DataPartition: Long) =>
if (DataPartition=="1") "SelfSourcedPublic"
else if (DataPartition=="2") "Japan"
else if (DataPartition=="3") "SelfSourcedPrivate"
else "ThirdPartyPrivate"
}
val dfContentEnvelope = sqlContext.read.format("com.databricks.spark.xml").option("rowTag", "env:ContentEnvelope").load("s3://trfsmallfffile/XML")
val dfContentItem = dfContentEnvelope.withColumn("column1", explode(dfContentEnvelope("env:Body.env:ContentItem"))).select($"env:Header.fun:DataPartitionId".as("DataPartitionId"),$"column1.*")
val df = dfContentItem.select(getDataPartition($"DataPartitionId"), $"env:Data.sr:Source.*", $"_action".as("FFAction|!|"))
df.show(false)
您可以继续执行其余代码。