Spark 中的条件串联
Conditional Concatenation in Spark
我有一个具有以下结构的数据框:
+----------+------+------+----------------+--------+------+
| date|market|metric|aggregator_Value|type |rank |
+----------+------+------+----------------+--------+------+
|2018-08-05| m1| 16 | m1|median | 1 |
|2018-08-03| m1| 5 | m1|median | 2 |
|2018-08-01| m1| 10 | m1|mean | 3 |
|2018-08-05| m2| 35 | m2|mean | 1 |
|2018-08-03| m2| 25 | m2|mean | 2 |
|2018-08-01| m2| 5 | m2|mean | 3 |
+----------+------+------+----------------+--------+------+
在此数据框中,排名列是根据市场列的日期和分组顺序计算的。
像这样
val w_rank = Window.partitionBy("market").orderBy(desc("date"))
val outputDF2=outputDF1.withColumn("rank",rank().over(w_rank))
我想在 rank=1 时提取输出数据框中度量列的连接值,条件是如果 type="median" in rank=1 行然后连接所有度量该市场的值。否则,如果 type="mean" in rank=1 行,则仅连接前 2 个指标值。像这样
+----------+------+------+----------------+--------+---------+
| date|market|metric|aggregator_Value|type |result |
+----------+------+------+----------------+--------+---------+
|2018-08-05| m1| 16 | m1|median |10|5|16 |
|2018-08-05| m2| 35 | m1|mean |25|35 |
+----------+------+------+----------------+--------+---------+
我怎样才能做到这一点?
您可以根据具体情况取消列metric
并应用collect_list
,然后应用concat_ws
以获得想要的结果,如下所示:
val df = Seq(
("2018-08-05", "m1", 16, "m1", "median", 1),
("2018-08-03", "m1", 5, "m1", "median", 2),
("2018-08-01", "m1", 10, "m1", "mean", 3),
("2018-08-05", "m2", 35, "m2", "mean", 1),
("2018-08-03", "m2", 25, "m2", "mean", 2),
("2018-08-01", "m2", 5, "m2", "mean", 3)
).toDF("date", "market", "metric", "aggregator_value", "type", "rank")
val win_desc = Window.partitionBy("market").orderBy(desc("date"))
val win_asc = Window.partitionBy("market").orderBy(asc("date"))
df.
withColumn("rank1_type", first($"type").over(win_desc.rowsBetween(Window.unboundedPreceding, 0))).
withColumn("cond_metric", when($"rank1_type" === "mean" && $"rank" > 2, null).otherwise($"metric")).
withColumn("result", concat_ws("|", collect_list("cond_metric").over(win_asc))).
where($"rank" === 1).
show
// +----------+------+------+----------------+------+----+----------+-----------+-------+
// | date|market|metric|aggregator_value| type|rank|rank1_type|cond_metric| result|
// +----------+------+------+----------------+------+----+----------+-----------+-------+
// |2018-08-05| m1| 16| m1|median| 1| median| 16|10|5|16|
// |2018-08-05| m2| 35| m2| mean| 1| mean| 35| 25|35|
// +----------+------+------+----------------+------+----+----------+-----------+-------+
我有一个具有以下结构的数据框:
+----------+------+------+----------------+--------+------+
| date|market|metric|aggregator_Value|type |rank |
+----------+------+------+----------------+--------+------+
|2018-08-05| m1| 16 | m1|median | 1 |
|2018-08-03| m1| 5 | m1|median | 2 |
|2018-08-01| m1| 10 | m1|mean | 3 |
|2018-08-05| m2| 35 | m2|mean | 1 |
|2018-08-03| m2| 25 | m2|mean | 2 |
|2018-08-01| m2| 5 | m2|mean | 3 |
+----------+------+------+----------------+--------+------+
在此数据框中,排名列是根据市场列的日期和分组顺序计算的。 像这样
val w_rank = Window.partitionBy("market").orderBy(desc("date"))
val outputDF2=outputDF1.withColumn("rank",rank().over(w_rank))
我想在 rank=1 时提取输出数据框中度量列的连接值,条件是如果 type="median" in rank=1 行然后连接所有度量该市场的值。否则,如果 type="mean" in rank=1 行,则仅连接前 2 个指标值。像这样
+----------+------+------+----------------+--------+---------+
| date|market|metric|aggregator_Value|type |result |
+----------+------+------+----------------+--------+---------+
|2018-08-05| m1| 16 | m1|median |10|5|16 |
|2018-08-05| m2| 35 | m1|mean |25|35 |
+----------+------+------+----------------+--------+---------+
我怎样才能做到这一点?
您可以根据具体情况取消列metric
并应用collect_list
,然后应用concat_ws
以获得想要的结果,如下所示:
val df = Seq(
("2018-08-05", "m1", 16, "m1", "median", 1),
("2018-08-03", "m1", 5, "m1", "median", 2),
("2018-08-01", "m1", 10, "m1", "mean", 3),
("2018-08-05", "m2", 35, "m2", "mean", 1),
("2018-08-03", "m2", 25, "m2", "mean", 2),
("2018-08-01", "m2", 5, "m2", "mean", 3)
).toDF("date", "market", "metric", "aggregator_value", "type", "rank")
val win_desc = Window.partitionBy("market").orderBy(desc("date"))
val win_asc = Window.partitionBy("market").orderBy(asc("date"))
df.
withColumn("rank1_type", first($"type").over(win_desc.rowsBetween(Window.unboundedPreceding, 0))).
withColumn("cond_metric", when($"rank1_type" === "mean" && $"rank" > 2, null).otherwise($"metric")).
withColumn("result", concat_ws("|", collect_list("cond_metric").over(win_asc))).
where($"rank" === 1).
show
// +----------+------+------+----------------+------+----+----------+-----------+-------+
// | date|market|metric|aggregator_value| type|rank|rank1_type|cond_metric| result|
// +----------+------+------+----------------+------+----+----------+-----------+-------+
// |2018-08-05| m1| 16| m1|median| 1| median| 16|10|5|16|
// |2018-08-05| m2| 35| m2| mean| 1| mean| 35| 25|35|
// +----------+------+------+----------------+------+----+----------+-----------+-------+