具有不同列的两个 Spark 数据帧的联合
Union of two Spark dataframes with different columns
我正在尝试合并两个具有不同列集的 Spark 数据帧。为此,我参考了以下 link :-
我的代码如下 -
val cols1 = finalDF.columns.toSet
val cols2 = df.columns.toSet
val total = cols1 ++ cols2
finalDF=finalDF.select(expr(cols1, total):_*).unionAll(df.select(expr(cols2, total):_*))
def expr(myCols: Set[String], allCols: Set[String]) = {
allCols.toList.map(x => x match {
case x if myCols.contains(x) => col(x)
case _ => lit(null).as(x)
})
}
但我面临的问题是两个数据框中的某些列是嵌套的。我有 StructType 和原始类型的列。现在,假设 A 列(属于 StructType)在 df 中而不是在 finalDF 中。但是在表达式中,
case _ => lit(null).as(x)
未将其设为 StructType。这就是为什么我无法将它们结合起来。它给我以下错误 -
org.apache.spark.sql.AnalysisException: Union can only be performed on tables with the compatible column types. NullType <> StructType(StructField(_VALUE,StringType,true), StructField(_id,LongType,true)) at the first column of the second table.
有什么建议我可以在这里做什么吗?
我会为此使用内置模式推理。它 更昂贵,但比匹配复杂结构简单得多,可能存在冲突:
spark.read.json(df1.toJSON.union(df2.toJSON))
您也可以同时导入所有文件,并且join
使用从文件头中提取的信息,使用input_file_name
。
import org.apache.spark.sql.function
val metadata: DataFrame // Just metadata from the header
val data: DataFrame // All files loaded together
metadata.withColumn("file", input_file_name)
.join(data.withColumn("file", input_file_name), Seq("file"))
df = df1.join(df2, ['each', 'shared', 'column'], how='full')
将用空值填充缺失数据。
我正在尝试合并两个具有不同列集的 Spark 数据帧。为此,我参考了以下 link :-
我的代码如下 -
val cols1 = finalDF.columns.toSet
val cols2 = df.columns.toSet
val total = cols1 ++ cols2
finalDF=finalDF.select(expr(cols1, total):_*).unionAll(df.select(expr(cols2, total):_*))
def expr(myCols: Set[String], allCols: Set[String]) = {
allCols.toList.map(x => x match {
case x if myCols.contains(x) => col(x)
case _ => lit(null).as(x)
})
}
但我面临的问题是两个数据框中的某些列是嵌套的。我有 StructType 和原始类型的列。现在,假设 A 列(属于 StructType)在 df 中而不是在 finalDF 中。但是在表达式中,
case _ => lit(null).as(x)
未将其设为 StructType。这就是为什么我无法将它们结合起来。它给我以下错误 -
org.apache.spark.sql.AnalysisException: Union can only be performed on tables with the compatible column types. NullType <> StructType(StructField(_VALUE,StringType,true), StructField(_id,LongType,true)) at the first column of the second table.
有什么建议我可以在这里做什么吗?
我会为此使用内置模式推理。它 更昂贵,但比匹配复杂结构简单得多,可能存在冲突:
spark.read.json(df1.toJSON.union(df2.toJSON))
您也可以同时导入所有文件,并且join
使用从文件头中提取的信息,使用input_file_name
。
import org.apache.spark.sql.function
val metadata: DataFrame // Just metadata from the header
val data: DataFrame // All files loaded together
metadata.withColumn("file", input_file_name)
.join(data.withColumn("file", input_file_name), Seq("file"))
df = df1.join(df2, ['each', 'shared', 'column'], how='full')
将用空值填充缺失数据。