Spark,组中 return 多行?
Spark, return multiple rows on group?
所以,我有一个包含以下数据的 Kafka 主题,我正在研究我们能否实现我们想要做的事情的概念验证。我之前试图在 Kafka 中解决它,但 Kafka 似乎不是正确的工具,所以现在看看 Spark :)
基本形式的数据如下所示:
+--+------------+-------+---------+
|id|serialNumber|source |company |
+--+------------+-------+---------+
|1 |123ABC |system1|Acme |
|2 |3285624 |system1|Ajax |
|3 |CDE567 |system1|Emca |
|4 |XX |system2|Ajax |
|5 |3285624 |system2|Ajax&Sons|
|6 |0147852 |system2|Ajax |
|7 |123ABC |system2|Acme |
|8 |CDE567 |system2|Xaja |
+--+------------+-------+---------+
主要分组列是 serialNumber,结果应该是 id 1 和 7 应该匹配,因为它与公司完全匹配。 Id 2 和 5 应该匹配,因为 id 2 中的公司与 id 5 中的公司完全部分匹配。Id 3 和 8 不应该匹配,因为公司不匹配。
我希望最终结果是这样的。请注意,来源并不固定为一两个,将来它会包含更多来源。
+------+-----+------------+-----------------+---------------+
|uuid |id |serialNumber|source |company |
+------+-----+------------+-----------------+---------------+
|<uuid>|[1,7]|123ABC |[system1,system2]|[Acme] |
|<uuid>|[2,5]|3285624 |[system1,system2]|[Ajax,Ajax&Sons|
|<uuid>|[3] |CDE567 |[system1] |[Emca] |
|<uuid>|[4] |XX |[system2] |[Ajax] |
|<uuid>|[6] |0147852 |[system2] |[Ajax] |
|<uuid>|[8] |CDE567 |[system2] |[Xaja] |
+------+-----+------------+-----------------+---------------+
我正在查看 groupByKey().mapGroups() 但在查找示例时遇到问题。 mapGroups() return 可以超过一行吗?
您可以简单地基于 serialNumber 列和所有其他列的 collect_list 进行分组。
代码:
import org.apache.spark.sql.{Dataset, SparkSession}
import org.apache.spark.sql.functions._
val ds = Seq((1,"123ABC", "system1", "Acme"),
(7,"123ABC", "system2", "Acme"))
.toDF("id", "serialNumber", "source", "company")
ds.groupBy("serialNumber")
.agg(
collect_list("id").alias("id"),
collect_list("source").alias("source"),
collect_list("company").alias("company")
)
.show(false)
输出:
+------------+------+------------------+------------+
|serialNumber|id |source |company |
+------------+------+------------------+------------+
|123ABC |[1, 7]|[system1, system2]|[Acme, Acme]|
+------------+------+------------------+------------+
如果您不想重复值,请使用 collect_set
ds.groupBy("serialNumber")
.agg(
collect_list("id").alias("id"),
collect_list("source").alias("source"),
collect_set("company").alias("company")
)
.show(false)
在公司列上使用 collect_set 输出:
+------------+------+------------------+-------+
|serialNumber|id |source |company|
+------------+------+------------------+-------+
|123ABC |[1, 7]|[system1, system2]|[Acme] |
+------------+------+------------------+-------+
所以,我有一个包含以下数据的 Kafka 主题,我正在研究我们能否实现我们想要做的事情的概念验证。我之前试图在 Kafka 中解决它,但 Kafka 似乎不是正确的工具,所以现在看看 Spark :)
基本形式的数据如下所示:
+--+------------+-------+---------+
|id|serialNumber|source |company |
+--+------------+-------+---------+
|1 |123ABC |system1|Acme |
|2 |3285624 |system1|Ajax |
|3 |CDE567 |system1|Emca |
|4 |XX |system2|Ajax |
|5 |3285624 |system2|Ajax&Sons|
|6 |0147852 |system2|Ajax |
|7 |123ABC |system2|Acme |
|8 |CDE567 |system2|Xaja |
+--+------------+-------+---------+
主要分组列是 serialNumber,结果应该是 id 1 和 7 应该匹配,因为它与公司完全匹配。 Id 2 和 5 应该匹配,因为 id 2 中的公司与 id 5 中的公司完全部分匹配。Id 3 和 8 不应该匹配,因为公司不匹配。
我希望最终结果是这样的。请注意,来源并不固定为一两个,将来它会包含更多来源。
+------+-----+------------+-----------------+---------------+
|uuid |id |serialNumber|source |company |
+------+-----+------------+-----------------+---------------+
|<uuid>|[1,7]|123ABC |[system1,system2]|[Acme] |
|<uuid>|[2,5]|3285624 |[system1,system2]|[Ajax,Ajax&Sons|
|<uuid>|[3] |CDE567 |[system1] |[Emca] |
|<uuid>|[4] |XX |[system2] |[Ajax] |
|<uuid>|[6] |0147852 |[system2] |[Ajax] |
|<uuid>|[8] |CDE567 |[system2] |[Xaja] |
+------+-----+------------+-----------------+---------------+
我正在查看 groupByKey().mapGroups() 但在查找示例时遇到问题。 mapGroups() return 可以超过一行吗?
您可以简单地基于 serialNumber 列和所有其他列的 collect_list 进行分组。
代码:
import org.apache.spark.sql.{Dataset, SparkSession}
import org.apache.spark.sql.functions._
val ds = Seq((1,"123ABC", "system1", "Acme"),
(7,"123ABC", "system2", "Acme"))
.toDF("id", "serialNumber", "source", "company")
ds.groupBy("serialNumber")
.agg(
collect_list("id").alias("id"),
collect_list("source").alias("source"),
collect_list("company").alias("company")
)
.show(false)
输出:
+------------+------+------------------+------------+
|serialNumber|id |source |company |
+------------+------+------------------+------------+
|123ABC |[1, 7]|[system1, system2]|[Acme, Acme]|
+------------+------+------------------+------------+
如果您不想重复值,请使用 collect_set
ds.groupBy("serialNumber")
.agg(
collect_list("id").alias("id"),
collect_list("source").alias("source"),
collect_set("company").alias("company")
)
.show(false)
在公司列上使用 collect_set 输出:
+------------+------+------------------+-------+
|serialNumber|id |source |company|
+------------+------+------------------+-------+
|123ABC |[1, 7]|[system1, system2]|[Acme] |
+------------+------+------------------+-------+