如何按条件列对 spark 数据帧组进行映射缩减?

How can I do map reduce on spark dataframe group by conditional columns?

我的 spark 数据框如下所示:

+------+------+-------+------+
|userid|useid1|userid2|score |
+------+------+-------+------+
|23    |null  |dsad   |3     |
|11    |44    |null   |4     |
|231   |null  |temp   |5     |
|231   |null  |temp   |2     |
+------+------+-------+------+

我想对每对 userid 和 useid1/userid2(以非空者为准)进行计算。

如果是useid1,我将分数乘以5,如果是userid2,我将分数乘以3。

最后,我想将每对的所有分数相加。

结果应该是:

+------+--------+-----------+
|userid|useid1/2|final score|
+------+--------+-----------+
|23    |dsad    |9          |
|11    |44      |20         |
|231   |temp    |21         |
+------+------+-------------+

我该怎么做?

对于 groupBy 部分,我知道 dataframe 有 groupBy 功能,但我不知道我是否可以有条件地使用它,比如 userid1 为 null,groupby(userid, userid2) , 如果 userid2 为 null, groupby(userid, useid1).

计算部分,如何根据条件乘以3或5?

coalesce 会做有需要的。

df.withColumn("userid1/2", coalesce(col("useid1"), col("useid1")))

基本上这个函数return订单的第一个非空值

文档:

COALESCE(T v1, T v2, ...)

Returns the first v that is not NULL, or NULL if all v's are NULL.

需要导入 import org.apache.spark.sql.functions.coalesce

以下解决方案将有助于解决您的问题。

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

  val groupByUserWinFun = Window.partitionBy("userid","useid1/2")
  val finalScoreDF = userDF.withColumn("useid1/2", when($"userid1".isNull, $"userid2").otherwise($"userid1"))
    .withColumn("finalscore", when($"userid1".isNull, $"score" * 3).otherwise($"score" * 5))
    .withColumn("finalscore", sum("finalscore").over(groupByUserWinFun))
    .select("userid", "useid1/2", "finalscore").distinct()

在 spark SQL、select userid1 或 2 中使用 when 方法并根据条件乘以值

输出:

+------+--------+----------+
|userid|useid1/2|finalscore|
+------+--------+----------+
|   11 |      44|      20.0|
|   23 |    dsad|       9.0|
|   231|    temp|      21.0|
+------+--------+----------+

分组依据:

val original = Seq(
  (23, null, "dsad", 3),
  (11, "44", null, 4),
  (231, null, "temp", 5),
  (231, null, "temp", 2)
).toDF("userid", "useid1", "userid2", "score")

// action
val result = original
  .withColumn("useid1/2", coalesce($"useid1", $"userid2"))
  .withColumn("score", $"score" * when($"useid1".isNotNull, 5).otherwise(3))
  .groupBy("userid", "useid1/2")
  .agg(sum("score").alias("final score"))

result.show(false)

输出:

+------+--------+-----------+
|userid|useid1/2|final score|
+------+--------+-----------+
|23    |dsad    |9          |
|231   |temp    |21         |
|11    |44      |20         |
+------+--------+-----------+