如何为第一个数据框中匹配的特定列值的所有值获取第二个数据框的数据?

How to get data of second data frame for all values of particular columns values matched in first dataframe?

有两个数据框如下

first_df
 |-- company_id: string (nullable = true)
 |-- max_dd: date (nullable = true)
 |-- min_dd: date (nullable = true)
 |-- mean: double (nullable = true)
 |-- count: long (nullable = false)

second_df 
 |-- company_id: string (nullable = true)
 |-- max_dd: date (nullable = true)
 |-- mean: double (nullable = true)
 |-- count: long (nullable = false)

我在 second_df 中有一些公司数据。我需要从 second_df 获取 first_df.

中列出的那些公司 ID 的数据

什么样的 spark api 对我有用? 我该怎么做?

谢谢。

问题扩展:

如果没有存储的记录,则 first_df 将为空。因此 first_df("mean") & first_df("count") 将为空,结果 "acc_new_mean" 为空。在那种情况下,我需要将 "new_mean" 设置为 second_df("mean") ,该怎么做? 我这样试过,但没有用 任何线索如何在这里处理 .withColumn("new_mean", ... ) ???

val acc_new_mean = (second_df("mean") + first_df("mean")) / (second_df("count") + first_df("count"))
    val acc_new_count  =  second_df("count") + first_df("count")


    val new_df = second_df.join(first_df.withColumnRenamed("company_id", "right_company_id").as("a"), 
                                 (  $"a.right_company_id"  === second_df("company_id") && ( second_df("min_dd")  > $"a.max_dd" ) ) 
                            , "leftOuter")
                            .withColumn("new_mean", if(acc_new_mean == null) lit(second_df("mean")) else  acc_new_mean )

方法 1:

如果您发现很难使用数据框的连接 API 连接 2 个数据框,您可以使用 sql 如果您在 sql 中感到舒适table。为此,您可以将 2 个数据帧注册为 spark 内存中的 tables,然后在上面写入 sql。

second_df.registerTempTable("table_second_df")
first_df.registerTempTable("table_first_df")

val new_df = spark.sql("select distinct s.* from table_second_df s join table_first_df f on s.company_id=f.company_id")
new_df.show()

按照您的要求,我已经添加了逻辑。

考虑一下您的 first_df 如下所示:

+----------+----------+----------+----+-----+
|company_id|    max_dd|    min_dd|mean|count|
+----------+----------+----------+----+-----+
|         A|2019-04-05|2019-04-01|  10|  100|
|         A|2019-04-06|2019-04-02|  20|  200|
|         B|2019-04-08|2019-04-01|  30|  300|
|         B|2019-04-09|2019-04-02|  40|  400|
+----------+----------+----------+----+-----+

考虑一下您的 second_df 如下所示:

+----------+----------+----+-----+
|company_id|    max_dd|mean|count|
+----------+----------+----+-----+
|         A|2019-04-03|  10|  100|
|         A|2019-04-02|  20|  200|
+----------+----------+----+-----+

由于公司 ID A 在第二个 table 中,我从 second_df 中获取了最新的 max_dd 记录。对于公司 ID B,它不在 second_df 我从 first_df 中获取了最新的 max_dd 记录。

请在下面找到代码。

first_df.registerTempTable("table_first_df")
second_df.registerTempTable("table_second_df")
val new_df = spark.sql("select company_id,max_dd,min_dd,mean,count from (select distinct s.company_id,s.max_dd,null as min_dd,s.mean,s.count,row_number() over (partition by s.company_id order by s.max_dd desc) rno from table_second_df s join table_first_df f on s.company_id=f.company_id) where rno=1 union select company_id,max_dd,min_dd,mean,count from (select distinct f.*,row_number() over (partition by f.company_id order by f.max_dd desc) rno from table_first_df f left join table_second_df s  on s.company_id=f.company_id where s.company_id is null) where rno=1")
new_df.show()

结果如下:

方法 2:

您可以使用 dataframe's API 的 join,而不是像我在 Approach 1 中提到的那样创建临时 table。这与 Approach 1 中的逻辑相同,但在这里我使用 dataframe's API 来完成此操作。请不要忘记导入 org.apache.spark.sql.expressions.Window,因为我在下面的代码中使用了 Window.patitionBy

val new_df = second_df.as('s).join(first_df.as('f),$"s.company_id" === $"f.company_id","inner").drop($"min_dd").withColumn("min_dd",lit("")).select($"s.company_id", $"s.max_dd",$"min_dd", $"s.mean", $"s.count").dropDuplicates.withColumn("Rno", row_number().over(Window.partitionBy($"s.company_id").orderBy($"s.max_dd".desc))).filter($"Rno" === 1).drop($"Rno").union(first_df.as('f).join(second_df.as('s),$"s.company_id" === $"f.company_id","left_anti").select($"f.company_id", $"f.max_dd",$"f.min_dd", $"f.mean", $"f.count").dropDuplicates.withColumn("Rno", row_number().over(Window.partitionBy($"f.company_id").orderBy($"f.max_dd".desc))).filter($"Rno" === 1).drop($"Rno"))
new_df.show()

结果如下:

如果您有任何问题,请告诉我。

 val acc_new_mean = //new mean literaal
 val acc_new_count  =   //new count literaal


          val resultDf = computed_df.join(accumulated_results_df.as("a"), 
                             (  $"company_id"  === computed_df("company_id")  ) 
                        , "leftOuter")
                        .withColumn("new_mean", when( acc_new_mean.isNull,lit(computed_df("mean")) ).otherwise(acc_new_mean) )
                        .withColumn("new_count", when( acc_new_count.isNull,lit(computed_df("count")) ).otherwise(acc_new_count) )
                         .select(
                            computed_df("company_id"),
                            computed_df("max_dd"),
                            col("new_mean").as("mean"),
                            col("new_count").as("count")
                          )