如何按条件列对 spark 数据帧组进行映射缩减?
How can I do map reduce on spark dataframe group by conditional columns?
我的 spark 数据框如下所示:
+------+------+-------+------+
|userid|useid1|userid2|score |
+------+------+-------+------+
|23 |null |dsad |3 |
|11 |44 |null |4 |
|231 |null |temp |5 |
|231 |null |temp |2 |
+------+------+-------+------+
我想对每对 userid 和 useid1/userid2(以非空者为准)进行计算。
如果是useid1,我将分数乘以5,如果是userid2,我将分数乘以3。
最后,我想将每对的所有分数相加。
结果应该是:
+------+--------+-----------+
|userid|useid1/2|final score|
+------+--------+-----------+
|23 |dsad |9 |
|11 |44 |20 |
|231 |temp |21 |
+------+------+-------------+
我该怎么做?
对于 groupBy
部分,我知道 dataframe 有 groupBy
功能,但我不知道我是否可以有条件地使用它,比如 userid1 为 null,groupby(userid, userid2)
, 如果 userid2 为 null, groupby(userid, useid1)
.
计算部分,如何根据条件乘以3或5?
coalesce
会做有需要的。
df.withColumn("userid1/2", coalesce(col("useid1"), col("useid1")))
基本上这个函数return订单的第一个非空值
文档:
COALESCE(T v1, T v2, ...)
Returns the first v that is not NULL, or NULL if all v's are NULL.
需要导入 import org.apache.spark.sql.functions.coalesce
以下解决方案将有助于解决您的问题。
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
val groupByUserWinFun = Window.partitionBy("userid","useid1/2")
val finalScoreDF = userDF.withColumn("useid1/2", when($"userid1".isNull, $"userid2").otherwise($"userid1"))
.withColumn("finalscore", when($"userid1".isNull, $"score" * 3).otherwise($"score" * 5))
.withColumn("finalscore", sum("finalscore").over(groupByUserWinFun))
.select("userid", "useid1/2", "finalscore").distinct()
在 spark SQL、select userid1 或 2 中使用 when
方法并根据条件乘以值
输出:
+------+--------+----------+
|userid|useid1/2|finalscore|
+------+--------+----------+
| 11 | 44| 20.0|
| 23 | dsad| 9.0|
| 231| temp| 21.0|
+------+--------+----------+
分组依据:
val original = Seq(
(23, null, "dsad", 3),
(11, "44", null, 4),
(231, null, "temp", 5),
(231, null, "temp", 2)
).toDF("userid", "useid1", "userid2", "score")
// action
val result = original
.withColumn("useid1/2", coalesce($"useid1", $"userid2"))
.withColumn("score", $"score" * when($"useid1".isNotNull, 5).otherwise(3))
.groupBy("userid", "useid1/2")
.agg(sum("score").alias("final score"))
result.show(false)
输出:
+------+--------+-----------+
|userid|useid1/2|final score|
+------+--------+-----------+
|23 |dsad |9 |
|231 |temp |21 |
|11 |44 |20 |
+------+--------+-----------+
我的 spark 数据框如下所示:
+------+------+-------+------+
|userid|useid1|userid2|score |
+------+------+-------+------+
|23 |null |dsad |3 |
|11 |44 |null |4 |
|231 |null |temp |5 |
|231 |null |temp |2 |
+------+------+-------+------+
我想对每对 userid 和 useid1/userid2(以非空者为准)进行计算。
如果是useid1,我将分数乘以5,如果是userid2,我将分数乘以3。
最后,我想将每对的所有分数相加。
结果应该是:
+------+--------+-----------+
|userid|useid1/2|final score|
+------+--------+-----------+
|23 |dsad |9 |
|11 |44 |20 |
|231 |temp |21 |
+------+------+-------------+
我该怎么做?
对于 groupBy
部分,我知道 dataframe 有 groupBy
功能,但我不知道我是否可以有条件地使用它,比如 userid1 为 null,groupby(userid, userid2)
, 如果 userid2 为 null, groupby(userid, useid1)
.
计算部分,如何根据条件乘以3或5?
coalesce
会做有需要的。
df.withColumn("userid1/2", coalesce(col("useid1"), col("useid1")))
基本上这个函数return订单的第一个非空值
文档:
COALESCE(T v1, T v2, ...)
Returns the first v that is not NULL, or NULL if all v's are NULL.
需要导入 import org.apache.spark.sql.functions.coalesce
以下解决方案将有助于解决您的问题。
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
val groupByUserWinFun = Window.partitionBy("userid","useid1/2")
val finalScoreDF = userDF.withColumn("useid1/2", when($"userid1".isNull, $"userid2").otherwise($"userid1"))
.withColumn("finalscore", when($"userid1".isNull, $"score" * 3).otherwise($"score" * 5))
.withColumn("finalscore", sum("finalscore").over(groupByUserWinFun))
.select("userid", "useid1/2", "finalscore").distinct()
在 spark SQL、select userid1 或 2 中使用 when
方法并根据条件乘以值
输出:
+------+--------+----------+
|userid|useid1/2|finalscore|
+------+--------+----------+
| 11 | 44| 20.0|
| 23 | dsad| 9.0|
| 231| temp| 21.0|
+------+--------+----------+
分组依据:
val original = Seq(
(23, null, "dsad", 3),
(11, "44", null, 4),
(231, null, "temp", 5),
(231, null, "temp", 2)
).toDF("userid", "useid1", "userid2", "score")
// action
val result = original
.withColumn("useid1/2", coalesce($"useid1", $"userid2"))
.withColumn("score", $"score" * when($"useid1".isNotNull, 5).otherwise(3))
.groupBy("userid", "useid1/2")
.agg(sum("score").alias("final score"))
result.show(false)
输出:
+------+--------+-----------+
|userid|useid1/2|final score|
+------+--------+-----------+
|23 |dsad |9 |
|231 |temp |21 |
|11 |44 |20 |
+------+--------+-----------+