在 apache spark 中复制记录数

Duplicating the record count in apache spark

这是这个问题的扩展,

val sales = Seq(
  ("Warsaw", 2016, "facebook","share",100),
  ("Warsaw", 2017, "facebook","like",200),
  ("Boston", 2015,"twitter","share",50),
  ("Boston", 2016,"facebook","share",150),
  ("Toronto", 2017,"twitter","like",50)
).toDF("city", "year","media","action","amount")

该解决方案很好,但预期输出应有条件地计入不同类别。

所以,输出应该是这样的,

+-------+--------+-----+
| Boston|facebook|    1|
| Boston| share1 |    2|
| Boston| share2 |    2|
| Boston| twitter|    1|
|Toronto| twitter|    1|
|Toronto| like   |    1|
| Warsaw|facebook|    2|
| Warsaw|share1  |    1|
| Warsaw|share2  |    1|
| Warsaw|like    |    1|
+-------+--------+-----+

在这里,如果操作是分享,我需要将其计算在分享 1 和分享 2 中。当我以编程方式计算它时,我使用 case 语句并说 case when action is share, share1 = share1 +1 , share2 = share2+1

但是我如何在 Scala 或 pyspark 或 sql 中执行此操作?

简单 filterunions 应该会给你想要的输出

val media = sales.groupBy("city", "media").count()

val action = sales.groupBy("city", "action").count().select($"city", $"action".as("media"), $"count")

val share = action.filter($"media" === "share")

  media.union(action.filter($"media" =!= "share"))
      .union(share.withColumn("media", lit("share1")))
      .union(share.withColumn("media", lit("share2")))
      .show(false)

哪个应该给你

+-------+--------+-----+
|city   |media   |count|
+-------+--------+-----+
|Boston |facebook|1    |
|Boston |twitter |1    |
|Toronto|twitter |1    |
|Warsaw |facebook|2    |
|Warsaw |like    |1    |
|Toronto|like    |1    |
|Boston |share1  |2    |
|Warsaw |share1  |1    |
|Boston |share2  |2    |
|Warsaw |share2  |1    |
+-------+--------+-----+