Spark,组中 return 多行?

Spark, return multiple rows on group?

所以,我有一个包含以下数据的 Kafka 主题,我正在研究我们能否实现我们想要做的事情的概念验证。我之前试图在 Kafka 中解决它,但 Kafka 似乎不是正确的工具,所以现在看看 Spark :)

基本形式的数据如下所示:

+--+------------+-------+---------+
|id|serialNumber|source |company  |
+--+------------+-------+---------+
|1 |123ABC      |system1|Acme     |
|2 |3285624     |system1|Ajax     |
|3 |CDE567      |system1|Emca     |
|4 |XX          |system2|Ajax     |
|5 |3285624     |system2|Ajax&Sons|
|6 |0147852     |system2|Ajax     |
|7 |123ABC      |system2|Acme     |
|8 |CDE567      |system2|Xaja     |
+--+------------+-------+---------+

主要分组列是 serialNumber,结果应该是 id 1 和 7 应该匹配,因为它与公司完全匹配。 Id 2 和 5 应该匹配,因为 id 2 中的公司与 id 5 中的公司完全部分匹配。Id 3 和 8 不应该匹配,因为公司不匹配。

我希望最终结果是这样的。请注意,来源并不固定为一两个,将来它会包含更多来源。

+------+-----+------------+-----------------+---------------+
|uuid  |id   |serialNumber|source           |company        |
+------+-----+------------+-----------------+---------------+
|<uuid>|[1,7]|123ABC      |[system1,system2]|[Acme]         |
|<uuid>|[2,5]|3285624     |[system1,system2]|[Ajax,Ajax&Sons|
|<uuid>|[3]  |CDE567      |[system1]        |[Emca]         |
|<uuid>|[4]  |XX          |[system2]        |[Ajax]         |
|<uuid>|[6]  |0147852     |[system2]        |[Ajax]         |
|<uuid>|[8]  |CDE567      |[system2]        |[Xaja]         |
+------+-----+------------+-----------------+---------------+

我正在查看 groupByKey().mapGroups() 但在查找示例时遇到问题。 mapGroups() return 可以超过一行吗?

您可以简单地基于 serialNumber 列和所有其他列的 collect_list 进行分组。

代码:

import org.apache.spark.sql.{Dataset, SparkSession}
import org.apache.spark.sql.functions._

      val ds = Seq((1,"123ABC", "system1", "Acme"),
        (7,"123ABC", "system2", "Acme"))
        .toDF("id", "serialNumber", "source", "company")
      
      ds.groupBy("serialNumber")
        .agg(
          collect_list("id").alias("id"),
          collect_list("source").alias("source"),
          collect_list("company").alias("company")
        )
        .show(false)

输出:

+------------+------+------------------+------------+
|serialNumber|id    |source            |company     |
+------------+------+------------------+------------+
|123ABC      |[1, 7]|[system1, system2]|[Acme, Acme]|
+------------+------+------------------+------------+

如果您不想重复值,请使用 collect_set

  ds.groupBy("serialNumber")
    .agg(
      collect_list("id").alias("id"),
      collect_list("source").alias("source"),
      collect_set("company").alias("company")
    )
    .show(false)

在公司列上使用 collect_set 输出:

+------------+------+------------------+-------+
|serialNumber|id    |source            |company|
+------------+------+------------------+-------+
|123ABC      |[1, 7]|[system1, system2]|[Acme] |
+------------+------+------------------+-------+