在限制行数的同时对数据框执行 groupBy
Performing a groupBy on a dataframe while limiting the number of rows
我有一个包含“id”列和“publication”列的数据框。 “id”列包含重复项,代表研究人员。 “发表”栏包含研究人员发表的学术著作的一些信息。
我想转换此数据框以将出版物收集到一个数组中,从而减少行数。我可以使用 groupBy 和 collect_list 来做到这一点。这将使“id”列仅包含唯一值。
myDataframe
.groupBy("id")
.agg(
collect_list("publication").as("publications")
).select("id", "publications")
但是,就我的目的而言,一行的数据太多了。我想限制收集的出版物数量,并将数据分成多行。
让我的数据框看起来像这样,其中 1 的 id 出现在 10 行中:
| id | publication |
| ----| -------------- |
| 1 | "foobar" |
| 1 | "foobar" |
| 1 | "foobar" |
| 1 | "foobar" |
| 1 | "foobar" |
| 1 | "foobar" |
| 2 | "foobar" |
| 1 | "foobar" |
| 1 | "foobar" |
| 1 | "foobar" |
| 1 | "foobar" |
我想按 id 分组并将出版物收集到一个列表中,但将其限制为每组最多 5 个出版物:
| id | publication |
| ----| -------------- |
| 1 | ["foobar",...] |
| 1 | ["foobar",...] |
| 2 | ["foobar"] |
我如何在 spark scala 中完成这个?
在 window 上的 df 中添加 row_number() 列,使用与分组相同的键
.withColumn('col', row_number().over(Window.partitionBy('id'))
使用此行 num modulo 5 或除以 5 并截断为整数创建新 ID
然后将 y 分组
如果您想要每行固定数量的出版物,您必须首先计算每个研究人员每个出版物的中间桶数。您可以通过出版物的 运行k 的整数除法 / 5(或者每个列表中您想要的出版物数量)来确定存储桶编号。然后,您可以对 id 和 bucket 编号进行分组。这是我 运行 在 spark-shell
:
中的示例
val testDF = Seq(
(1, "pub1"),
(1, "pub2"),
(1, "pub3"),
(1, "pub4"),
(1, "pub5"),
(1, "pub6"),
(1, "pub7"),
(1, "pub8"),
(2, "pub9"),
(2, "pub10"),
(2, "pub11"),
(2, "pub12"),
(2, "pub13")).toDF("id", "publication")
testDF.withColumn("rn", row_number().over(Window.partitionBy("id").orderBy("id")) - 1)
.withColumn("bucket", floor(col("rn") / 5))
.groupBy("id", "bucket").agg(collect_list("publication").as("publications"))
.select("id", "publications")
.show(false)
输出:
+---+----------------------------------+
|id |publications |
+---+----------------------------------+
|1 |[pub1, pub2, pub3, pub4, pub5] |
|1 |[pub6, pub7, pub8] |
|2 |[pub9, pub10, pub11, pub12, pub13]|
+---+----------------------------------+
我有一个包含“id”列和“publication”列的数据框。 “id”列包含重复项,代表研究人员。 “发表”栏包含研究人员发表的学术著作的一些信息。
我想转换此数据框以将出版物收集到一个数组中,从而减少行数。我可以使用 groupBy 和 collect_list 来做到这一点。这将使“id”列仅包含唯一值。
myDataframe
.groupBy("id")
.agg(
collect_list("publication").as("publications")
).select("id", "publications")
但是,就我的目的而言,一行的数据太多了。我想限制收集的出版物数量,并将数据分成多行。
让我的数据框看起来像这样,其中 1 的 id 出现在 10 行中:
| id | publication |
| ----| -------------- |
| 1 | "foobar" |
| 1 | "foobar" |
| 1 | "foobar" |
| 1 | "foobar" |
| 1 | "foobar" |
| 1 | "foobar" |
| 2 | "foobar" |
| 1 | "foobar" |
| 1 | "foobar" |
| 1 | "foobar" |
| 1 | "foobar" |
我想按 id 分组并将出版物收集到一个列表中,但将其限制为每组最多 5 个出版物:
| id | publication |
| ----| -------------- |
| 1 | ["foobar",...] |
| 1 | ["foobar",...] |
| 2 | ["foobar"] |
我如何在 spark scala 中完成这个?
在 window 上的 df 中添加 row_number() 列,使用与分组相同的键
.withColumn('col', row_number().over(Window.partitionBy('id'))
使用此行 num modulo 5 或除以 5 并截断为整数创建新 ID
然后将 y 分组
如果您想要每行固定数量的出版物,您必须首先计算每个研究人员每个出版物的中间桶数。您可以通过出版物的 运行k 的整数除法 / 5(或者每个列表中您想要的出版物数量)来确定存储桶编号。然后,您可以对 id 和 bucket 编号进行分组。这是我 运行 在 spark-shell
:
val testDF = Seq(
(1, "pub1"),
(1, "pub2"),
(1, "pub3"),
(1, "pub4"),
(1, "pub5"),
(1, "pub6"),
(1, "pub7"),
(1, "pub8"),
(2, "pub9"),
(2, "pub10"),
(2, "pub11"),
(2, "pub12"),
(2, "pub13")).toDF("id", "publication")
testDF.withColumn("rn", row_number().over(Window.partitionBy("id").orderBy("id")) - 1)
.withColumn("bucket", floor(col("rn") / 5))
.groupBy("id", "bucket").agg(collect_list("publication").as("publications"))
.select("id", "publications")
.show(false)
输出:
+---+----------------------------------+
|id |publications |
+---+----------------------------------+
|1 |[pub1, pub2, pub3, pub4, pub5] |
|1 |[pub6, pub7, pub8] |
|2 |[pub9, pub10, pub11, pub12, pub13]|
+---+----------------------------------+