如何合并具有相同列数的两个数据框?
How to union two dataframes which have same number of columns?
数据框 df1 包含列:a、b、c、d、e(空数据框)
Dataframe df2 包含列:b、c、d、e、_c4(包含数据)
我想对这两个数据帧进行联合。我尝试使用
df1.union(df2);
这会用位置填充数据。但我想用列名填充数据。
然后我尝试了
df1.unionByName(df2, allowMissingColumns= true);
但是它在 ``allowMissingColumns= true` 中抛出错误。我知道这是因为版本的错误。我使用 spark 版本 2.4.4.
df1:
|a|b|c|d|e|
+---------+
| | | | | |
+---------+
df2:
|b|c|d|e|_c4|
+-----------+
|2|3|5|6| |
+-----------+
预期输出:
|a|b|c|d|e|
+---------+
| |2|3|5|6|
+---------+
我的问题是有没有其他方法可以使用列名用填充的数据框 (df2) 覆盖空数据框 (df1)?还是我需要更改 pom.xml 文件中的版本?
请提出一些建议。
Pom 文件:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>rule</groupId>
<artifactId>qwerty</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>qwerty</name>
<description>code</description>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.4.4</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.4.4</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hive/hive-jdbc -->
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<version>3.1.2</version>
</dependency>
<dependency>
<groupId>com.databricks</groupId>
<artifactId>spark-avro_2.11</artifactId>
<version>4.0.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<outputDirectory>${project.build.directory}</outputDirectory>
<archive>
<manifest>
<mainClass>qwerty.qwerty</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration> <source>1.8</source> <target>1.8</target> </configuration>
</plugin>
</plugins>
</build>
</project>
unionByName
自 spark 2.3
以来就存在,但 allowMissingColumns
仅出现在 spark 3.1
中,因此您在 2.4
.[=21 中获得错误=]
在 spark 2.4
中,您可以尝试自己实现相同的行为。也就是说,转换 df2
以使其包含 df1
中的所有列。如果某列不在 df2
中,我们可以将其设置为空。在 scala 中,你可以这样做:
val df2_as1 = df2
.select(df1
.columns
.map(c => if(df2.columns.contains(c)) col(c) else lit(null).as(c))
: _*)
// Here, union would work just as well.
val result = df1.unionByName(df2_as1)
在java中,那显然更痛苦:
List<String> df2_cols = Arrays.asList(df2.columns());
// cols is the list of columns contained in df1, but all columns
// that are not in df2 are set to null.
List<Column> cols = new ArrayList<>();
for (String c : df1.columns()) {
if(df2_cols.contains(c))
cols.add(functions.col(c));
else
cols.add(functions.lit(null).alias(c));
}
// We modify df2 so that its schema matches df1's.
Dataset<Row> df2_as1 = df2.select(JavaConverters.asScalaBuffer(cols).toSeq());
// Here, union would work just as well.
Dataset<Row> result = df1.unionByName(df2_as1);
数据框 df1 包含列:a、b、c、d、e(空数据框)
Dataframe df2 包含列:b、c、d、e、_c4(包含数据)
我想对这两个数据帧进行联合。我尝试使用
df1.union(df2);
这会用位置填充数据。但我想用列名填充数据。
然后我尝试了
df1.unionByName(df2, allowMissingColumns= true);
但是它在 ``allowMissingColumns= true` 中抛出错误。我知道这是因为版本的错误。我使用 spark 版本 2.4.4.
df1:
|a|b|c|d|e|
+---------+
| | | | | |
+---------+
df2:
|b|c|d|e|_c4|
+-----------+
|2|3|5|6| |
+-----------+
预期输出:
|a|b|c|d|e|
+---------+
| |2|3|5|6|
+---------+
我的问题是有没有其他方法可以使用列名用填充的数据框 (df2) 覆盖空数据框 (df1)?还是我需要更改 pom.xml 文件中的版本? 请提出一些建议。
Pom 文件:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>rule</groupId>
<artifactId>qwerty</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>qwerty</name>
<description>code</description>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.4.4</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.4.4</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hive/hive-jdbc -->
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<version>3.1.2</version>
</dependency>
<dependency>
<groupId>com.databricks</groupId>
<artifactId>spark-avro_2.11</artifactId>
<version>4.0.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<outputDirectory>${project.build.directory}</outputDirectory>
<archive>
<manifest>
<mainClass>qwerty.qwerty</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration> <source>1.8</source> <target>1.8</target> </configuration>
</plugin>
</plugins>
</build>
</project>
unionByName
自 spark 2.3
以来就存在,但 allowMissingColumns
仅出现在 spark 3.1
中,因此您在 2.4
.[=21 中获得错误=]
在 spark 2.4
中,您可以尝试自己实现相同的行为。也就是说,转换 df2
以使其包含 df1
中的所有列。如果某列不在 df2
中,我们可以将其设置为空。在 scala 中,你可以这样做:
val df2_as1 = df2
.select(df1
.columns
.map(c => if(df2.columns.contains(c)) col(c) else lit(null).as(c))
: _*)
// Here, union would work just as well.
val result = df1.unionByName(df2_as1)
在java中,那显然更痛苦:
List<String> df2_cols = Arrays.asList(df2.columns());
// cols is the list of columns contained in df1, but all columns
// that are not in df2 are set to null.
List<Column> cols = new ArrayList<>();
for (String c : df1.columns()) {
if(df2_cols.contains(c))
cols.add(functions.col(c));
else
cols.add(functions.lit(null).alias(c));
}
// We modify df2 so that its schema matches df1's.
Dataset<Row> df2_as1 = df2.select(JavaConverters.asScalaBuffer(cols).toSeq());
// Here, union would work just as well.
Dataset<Row> result = df1.unionByName(df2_as1);