如何合并具有相同列数的两个数据框?

How to union two dataframes which have same number of columns?

数据框 df1 包含列:a、b、c、d、e(空数据框)

Dataframe df2 包含列:b、c、d、e、_c4(包含数据)

我想对这两个数据帧进行联合。我尝试使用

df1.union(df2);

这会用位置填充数据。但我想用列名填充数据。

然后我尝试了

df1.unionByName(df2, allowMissingColumns= true);

但是它在 ``allowMissingColumns= true` 中抛出错误。我知道这是因为版本的错误。我使用 spark 版本 2.4.4.

df1:

|a|b|c|d|e|
+---------+
| | | | | | 
+---------+

df2:

|b|c|d|e|_c4|
+-----------+
|2|3|5|6|   | 
+-----------+

预期输出:

|a|b|c|d|e|
+---------+
| |2|3|5|6| 
+---------+

我的问题是有没有其他方法可以使用列名用填充的数据框 (df2) 覆盖空数据框 (df1)?还是我需要更改 pom.xml 文件中的版本? 请提出一些建议。

Pom 文件:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>rule</groupId>
  <artifactId>qwerty</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <name>qwerty</name>
  <description>code</description>
  <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.4.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.4.4</version>
        </dependency>

        
        <!-- https://mvnrepository.com/artifact/org.apache.hive/hive-jdbc -->
        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-jdbc</artifactId>
            <version>3.1.2</version>
        </dependency>
        <dependency>
            <groupId>com.databricks</groupId>
            <artifactId>spark-avro_2.11</artifactId>
            <version>4.0.0</version>
        </dependency>

   </dependencies>
   <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <outputDirectory>${project.build.directory}</outputDirectory>
                    <archive>
                        <manifest>
                            <mainClass>qwerty.qwerty</mainClass>
                        </manifest>
                    </archive>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin> 
                <artifactId>maven-compiler-plugin</artifactId> 
                <configuration> <source>1.8</source> <target>1.8</target> </configuration> 
            </plugin>
        </plugins>
    </build>
</project>

unionByName 自 spark 2.3 以来就存在,但 allowMissingColumns 仅出现在 spark 3.1 中,因此您在 2.4.[=21 中获得错误=]

在 spark 2.4 中,您可以尝试自己实现相同的行为。也就是说,转换 df2 以使其包含 df1 中的所有列。如果某列不在 df2 中,我们可以将其设置为空。在 scala 中,你可以这样做:

val df2_as1 = df2
    .select(df1
        .columns
        .map(c => if(df2.columns.contains(c)) col(c) else lit(null).as(c))
    : _*)
// Here, union would work just as well.
val result = df1.unionByName(df2_as1)

在java中,那显然更痛苦:

List<String> df2_cols = Arrays.asList(df2.columns());
// cols is the list of columns contained in df1, but all columns
// that are not in df2 are set to null.
List<Column> cols = new ArrayList<>();
for (String c : df1.columns()) {
    if(df2_cols.contains(c))
          cols.add(functions.col(c));
    else
          cols.add(functions.lit(null).alias(c));
}
// We modify df2 so that its schema matches df1's.
Dataset<Row> df2_as1 = df2.select(JavaConverters.asScalaBuffer(cols).toSeq());
        
// Here, union would work just as well.
Dataset<Row> result = df1.unionByName(df2_as1);