数据框中的新列源自第二个数据框

new column in dataframe derived from second dataframe

我有两个数据框 df1 和 df2.I' 从 df2 在 df1 中添加新列:

     df1          
    X Y Z      
    1 2 3        
    4 5 6        
    7 8 9        
    3 6 9       
    
    df2
   col1 col2
   XX    aa
   YY    bb
   XX    cc
   ZZ    vv

df2 中 col1 的值应添加为 df1 中的新列(如果它不存在)和 col2 作为新 column.For 示例的值:

    df1              
    X Y Z XX  YY  ZZ   
    1 2 3  aa  bb vv    
    4 5 6  cc           
    7 8 9               
    3 6 9               

     df2
   col1 col2
   XX    aa
   YY    bb
   XX    cc
   ZZ    vv

首先,将spark数据集做成分布式。但是列名是模式的一部分,所以它们在主人的记忆中。因此,要为 df2.col1 的每个不同值添加列,您首先需要在 master 中获取这些值(即收集)

// inputs
val df1 = List((1,2,3), (4,5,6), (7,8,9), (3,6,9)).toDF("X", "Y", "Z")
val df2 = List(("XX", "aa"), ("YY", "bb"), ("XX", "cc"), ("ZZ", "vv")).toDF("col1", "col2")

val newColumns = df2.select("col1").as[String].distinct.collect

val newDF = newColumns.foldLeft(df1)( (df, col) => df.withColumn(col, lit("?")))
newDF.show

+---+---+---+---+---+---+
|  X|  Y|  Z| ZZ| YY| XX|
+---+---+---+---+---+---+
|  1|  2|  3|  ?|  ?|  ?|
|  4|  5|  6|  ?|  ?|  ?|
|  7|  8|  9|  ?|  ?|  ?|
|  3|  6|  9|  ?|  ?|  ?|
+---+---+---+---+---+---+

但是

  • 我不知道你想在这些列中输入什么值(上面,我在所有地方都输入了“?”)
  • 如果df2的行很多,比如10的千行,可以kill master收集起来全部加到df1

现在,再多说一点,下面介绍如何从 df2.col1 添加列并将 df2.col2 的串联值作为值

val toAdd = df2.groupBy("col1").agg(concat_ws(",", collect_set("col2")).as("col2All"))
toAdd.show

+----+-------+
|col1|col2All|
+----+-------+
|  ZZ|     vv|
|  YY|     bb|
|  XX|  cc,aa|
+----+-------+

val newColumns = toAdd.rdd.map(r => (r.getAs[String]("col1"), r.getAs[String]("col2All"))).collectAsMap()

val newDF = newColumns.foldLeft(df1){ case (df, (name, value)) => df.withColumn(name, lit(value))}
newDF.show

+---+---+---+-----+---+---+
|  X|  Y|  Z|   XX| YY| ZZ|
+---+---+---+-----+---+---+
|  1|  2|  3|cc,aa| bb| vv|
|  4|  5|  6|cc,aa| bb| vv|
|  7|  8|  9|cc,aa| bb| vv|
|  3|  6|  9|cc,aa| bb| vv|
+---+---+---+-----+---+---+