数据框中的新列源自第二个数据框
new column in dataframe derived from second dataframe
我有两个数据框 df1 和 df2.I' 从 df2 在 df1 中添加新列:
df1
X Y Z
1 2 3
4 5 6
7 8 9
3 6 9
df2
col1 col2
XX aa
YY bb
XX cc
ZZ vv
df2 中 col1 的值应添加为 df1 中的新列(如果它不存在)和 col2 作为新 column.For 示例的值:
df1
X Y Z XX YY ZZ
1 2 3 aa bb vv
4 5 6 cc
7 8 9
3 6 9
df2
col1 col2
XX aa
YY bb
XX cc
ZZ vv
首先,将spark数据集做成分布式。但是列名是模式的一部分,所以它们在主人的记忆中。因此,要为 df2.col1
的每个不同值添加列,您首先需要在 master 中获取这些值(即收集)
// inputs
val df1 = List((1,2,3), (4,5,6), (7,8,9), (3,6,9)).toDF("X", "Y", "Z")
val df2 = List(("XX", "aa"), ("YY", "bb"), ("XX", "cc"), ("ZZ", "vv")).toDF("col1", "col2")
val newColumns = df2.select("col1").as[String].distinct.collect
val newDF = newColumns.foldLeft(df1)( (df, col) => df.withColumn(col, lit("?")))
newDF.show
+---+---+---+---+---+---+
| X| Y| Z| ZZ| YY| XX|
+---+---+---+---+---+---+
| 1| 2| 3| ?| ?| ?|
| 4| 5| 6| ?| ?| ?|
| 7| 8| 9| ?| ?| ?|
| 3| 6| 9| ?| ?| ?|
+---+---+---+---+---+---+
但是
- 我不知道你想在这些列中输入什么值(上面,我在所有地方都输入了“?”)
- 如果df2的行很多,比如10的千行,可以kill master收集起来全部加到df1
现在,再多说一点,下面介绍如何从 df2.col1 添加列并将 df2.col2 的串联值作为值
val toAdd = df2.groupBy("col1").agg(concat_ws(",", collect_set("col2")).as("col2All"))
toAdd.show
+----+-------+
|col1|col2All|
+----+-------+
| ZZ| vv|
| YY| bb|
| XX| cc,aa|
+----+-------+
val newColumns = toAdd.rdd.map(r => (r.getAs[String]("col1"), r.getAs[String]("col2All"))).collectAsMap()
val newDF = newColumns.foldLeft(df1){ case (df, (name, value)) => df.withColumn(name, lit(value))}
newDF.show
+---+---+---+-----+---+---+
| X| Y| Z| XX| YY| ZZ|
+---+---+---+-----+---+---+
| 1| 2| 3|cc,aa| bb| vv|
| 4| 5| 6|cc,aa| bb| vv|
| 7| 8| 9|cc,aa| bb| vv|
| 3| 6| 9|cc,aa| bb| vv|
+---+---+---+-----+---+---+
我有两个数据框 df1 和 df2.I' 从 df2 在 df1 中添加新列:
df1
X Y Z
1 2 3
4 5 6
7 8 9
3 6 9
df2
col1 col2
XX aa
YY bb
XX cc
ZZ vv
df2 中 col1 的值应添加为 df1 中的新列(如果它不存在)和 col2 作为新 column.For 示例的值:
df1
X Y Z XX YY ZZ
1 2 3 aa bb vv
4 5 6 cc
7 8 9
3 6 9
df2
col1 col2
XX aa
YY bb
XX cc
ZZ vv
首先,将spark数据集做成分布式。但是列名是模式的一部分,所以它们在主人的记忆中。因此,要为 df2.col1
的每个不同值添加列,您首先需要在 master 中获取这些值(即收集)
// inputs
val df1 = List((1,2,3), (4,5,6), (7,8,9), (3,6,9)).toDF("X", "Y", "Z")
val df2 = List(("XX", "aa"), ("YY", "bb"), ("XX", "cc"), ("ZZ", "vv")).toDF("col1", "col2")
val newColumns = df2.select("col1").as[String].distinct.collect
val newDF = newColumns.foldLeft(df1)( (df, col) => df.withColumn(col, lit("?")))
newDF.show
+---+---+---+---+---+---+
| X| Y| Z| ZZ| YY| XX|
+---+---+---+---+---+---+
| 1| 2| 3| ?| ?| ?|
| 4| 5| 6| ?| ?| ?|
| 7| 8| 9| ?| ?| ?|
| 3| 6| 9| ?| ?| ?|
+---+---+---+---+---+---+
但是
- 我不知道你想在这些列中输入什么值(上面,我在所有地方都输入了“?”)
- 如果df2的行很多,比如10的千行,可以kill master收集起来全部加到df1
现在,再多说一点,下面介绍如何从 df2.col1 添加列并将 df2.col2 的串联值作为值
val toAdd = df2.groupBy("col1").agg(concat_ws(",", collect_set("col2")).as("col2All"))
toAdd.show
+----+-------+
|col1|col2All|
+----+-------+
| ZZ| vv|
| YY| bb|
| XX| cc,aa|
+----+-------+
val newColumns = toAdd.rdd.map(r => (r.getAs[String]("col1"), r.getAs[String]("col2All"))).collectAsMap()
val newDF = newColumns.foldLeft(df1){ case (df, (name, value)) => df.withColumn(name, lit(value))}
newDF.show
+---+---+---+-----+---+---+
| X| Y| Z| XX| YY| ZZ|
+---+---+---+-----+---+---+
| 1| 2| 3|cc,aa| bb| vv|
| 4| 5| 6|cc,aa| bb| vv|
| 7| 8| 9|cc,aa| bb| vv|
| 3| 6| 9|cc,aa| bb| vv|
+---+---+---+-----+---+---+