为所有列名动态循环数据集
Dynamically loop a dataset for all column names
我正在做的项目有大约 500 个列名,但我需要在每个 table 名称上应用 coalesce
函数。
df1
架构
-id
-col1
...
-col500
df2
架构
-id
-col1
...
-col500
Dataset<Row> newDS= df1.join(df2, "id")
.select(
df1.col("id"),
functions.coalesce(df1.col("col1"),df2.col("col1")).as("col1"),
functions.coalesce(df1.col("col2"),df2.col("col2")).as("col2"),
...
functions.coalesce(df1.col("col500"),df2.col("col500")).as("col500"),
)
.show();
我试过的
Dataset<Row> j1 = df1.join(df2, "id");
Dataset<Row> gh1 = spark.emptyDataFrame();
String[] f = df1.columns();
for(String h : f)
{
if(h == "id")
gh1 = j1.select(df1.col("id"));
else{
gh1 = j1.select(functions.coalesce(df1.col(h),df2.col(h)).as(h));
}
}
gh1.show();
如果我没理解错的话,你有两个具有相同架构的数据框,你想将它们的 500 列 2 乘 2 合并,而不必编写所有内容。
这可以通过向 select
提供一系列列来轻松实现。此外,由于 select
不接受列序列,而是接受可变数量的列参数,因此您需要添加 : _*
让 scala 知道它需要将序列的所有元素视为单独的参数。
val cols = df1.columns.filter(_ != "id")
df1
.join(df2, "id")
.select(col("id") +: cols.map(n => coalesce(df1.col(n), df2.col(n)) as n) : _* )
在 Java 中,您可以将值数组传递给需要可变数量参数的方法,因此您可以像这样重写代码:
Column[] coalescedColumns = Stream.of(df1.columns())
.map(name -> functions.coalesce(df1.col(name),df2.col(name)).as(name))
.toArray(Column[]::new);
Dataset<Row> newDS = df1.join(df2, "id").select(coalescedColumns)
我没有排除 id
列,因为合并也将按预期在此列上工作
df1.columns
将 returns 字符串数组,因此无法调用其上的流,.
Column[] coalescedColumns =
Stream.of(df1.columns())
.map(name -> functions.coalesce(df1.col(name),df2.col(name)).as(name))
.toArray(Column[]::new);
Dataset<Row> newDS = df1.as("a").join(df2.as("b")).where("a.id == b.id").select(coalescedColumns);
我正在做的项目有大约 500 个列名,但我需要在每个 table 名称上应用 coalesce
函数。
df1
架构
-id
-col1
...
-col500
df2
架构
-id
-col1
...
-col500
Dataset<Row> newDS= df1.join(df2, "id")
.select(
df1.col("id"),
functions.coalesce(df1.col("col1"),df2.col("col1")).as("col1"),
functions.coalesce(df1.col("col2"),df2.col("col2")).as("col2"),
...
functions.coalesce(df1.col("col500"),df2.col("col500")).as("col500"),
)
.show();
我试过的
Dataset<Row> j1 = df1.join(df2, "id");
Dataset<Row> gh1 = spark.emptyDataFrame();
String[] f = df1.columns();
for(String h : f)
{
if(h == "id")
gh1 = j1.select(df1.col("id"));
else{
gh1 = j1.select(functions.coalesce(df1.col(h),df2.col(h)).as(h));
}
}
gh1.show();
如果我没理解错的话,你有两个具有相同架构的数据框,你想将它们的 500 列 2 乘 2 合并,而不必编写所有内容。
这可以通过向 select
提供一系列列来轻松实现。此外,由于 select
不接受列序列,而是接受可变数量的列参数,因此您需要添加 : _*
让 scala 知道它需要将序列的所有元素视为单独的参数。
val cols = df1.columns.filter(_ != "id")
df1
.join(df2, "id")
.select(col("id") +: cols.map(n => coalesce(df1.col(n), df2.col(n)) as n) : _* )
在 Java 中,您可以将值数组传递给需要可变数量参数的方法,因此您可以像这样重写代码:
Column[] coalescedColumns = Stream.of(df1.columns())
.map(name -> functions.coalesce(df1.col(name),df2.col(name)).as(name))
.toArray(Column[]::new);
Dataset<Row> newDS = df1.join(df2, "id").select(coalescedColumns)
我没有排除 id
列,因为合并也将按预期在此列上工作
df1.columns
将 returns 字符串数组,因此无法调用其上的流,
Column[] coalescedColumns =
Stream.of(df1.columns())
.map(name -> functions.coalesce(df1.col(name),df2.col(name)).as(name))
.toArray(Column[]::new);
Dataset<Row> newDS = df1.as("a").join(df2.as("b")).where("a.id == b.id").select(coalescedColumns);