在 scala 中处理来自 spark 数据集的行组
processing group of rows from spark dataset in scala
我正在寻找一种方法将我的大型 spark 数据集划分为 groups/batches 并在某些函数中处理该组行。所以基本上应该将一组行输入到我的函数中,输出对我来说是单位,因为我不想聚合或更新输入记录,而只是执行一些计算。
为了理解,假设我有以下输入。
Col1
Col2
Col3
1
A
1
1
B
2
1
C
3
1
A
4
1
A
5
2
C
6
2
X
7
2
X
8
假设我需要按 col1 和 col2 分组,这将给我以下分组
(1,A,1), (1,A,4), (1,A,5) ---> 第一组
(1, B, 2) ---> 第二组
(1, C, 3), (1, C, 6)---> 第三组
(2, X, 7), (2, X, 8) ---> 第 4 组
所以我想将这些组传递给我的函数以执行一些逻辑。现在,假设我在该方法中对 Col3 求和。(这不是我的要求,但假设我想在我的单独方法中进行求和)。生成以下 o/p.
Col1
Col2
Col3
1
A
10
1
B
2
1
C
9
2
X
15
我怎样才能做到这一点,根据一些建议,我试图查看 UDAF 但找不到如何使用它的方法。请注意,我的真实输入数据集有超过 5 亿条记录。谢谢
这里是一个基于您的输入的简单示例,可帮助您入门:
from pyspark.sql.types import IntegerType
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = SparkSession.builder.getOrCreate()
data = [
(1, "A", 1),
(1, "B", 2),
(1, "C", 3),
(1, "A", 4),
(1, "A", 5),
(1, "C", 6),
(2, "X", 7),
(2, "X", 8),
]
df = spark.createDataFrame(data, ["col1", "col2", "col3"])
df.show()
+----+----+----+
|col1|col2|col3|
+----+----+----+
| 1| A| 1|
| 1| B| 2|
| 1| C| 3|
| 1| A| 4|
| 1| A| 5|
| 1| C| 6|
| 2| X| 7|
| 2| X| 8|
+----+----+----+
# define your function - pure Python here, no Spark needed
def dummy_f(xs):
return sum(xs)
# apply your function as UDF - needs input function and return type (integer here)
(
df
.groupBy(F.col("col1"), F.col("col2"))
.agg(F.collect_list(F.col("col3").cast("int")).alias("col3"))
.withColumn("col3sum", F.udf(dummy_f, IntegerType())(F.col("col3")))
).show()
+----+----+---------+-------+
|col1|col2| col3|col3sum|
+----+----+---------+-------+
| 1| A|[1, 4, 5]| 10|
| 1| B| [2]| 2|
| 1| C| [3, 6]| 9|
| 2| X| [7, 8]| 15|
+----+----+---------+-------+
根据输入函数的需要聚合列是关键。您可以使用 create_map
创建字典或 collect_list
如此处所示。
我正在寻找一种方法将我的大型 spark 数据集划分为 groups/batches 并在某些函数中处理该组行。所以基本上应该将一组行输入到我的函数中,输出对我来说是单位,因为我不想聚合或更新输入记录,而只是执行一些计算。
为了理解,假设我有以下输入。
Col1 | Col2 | Col3 |
---|---|---|
1 | A | 1 |
1 | B | 2 |
1 | C | 3 |
1 | A | 4 |
1 | A | 5 |
2 | C | 6 |
2 | X | 7 |
2 | X | 8 |
假设我需要按 col1 和 col2 分组,这将给我以下分组
(1,A,1), (1,A,4), (1,A,5) ---> 第一组
(1, B, 2) ---> 第二组
(1, C, 3), (1, C, 6)---> 第三组
(2, X, 7), (2, X, 8) ---> 第 4 组
所以我想将这些组传递给我的函数以执行一些逻辑。现在,假设我在该方法中对 Col3 求和。(这不是我的要求,但假设我想在我的单独方法中进行求和)。生成以下 o/p.
Col1 | Col2 | Col3 |
---|---|---|
1 | A | 10 |
1 | B | 2 |
1 | C | 9 |
2 | X | 15 |
我怎样才能做到这一点,根据一些建议,我试图查看 UDAF 但找不到如何使用它的方法。请注意,我的真实输入数据集有超过 5 亿条记录。谢谢
这里是一个基于您的输入的简单示例,可帮助您入门:
from pyspark.sql.types import IntegerType
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = SparkSession.builder.getOrCreate()
data = [
(1, "A", 1),
(1, "B", 2),
(1, "C", 3),
(1, "A", 4),
(1, "A", 5),
(1, "C", 6),
(2, "X", 7),
(2, "X", 8),
]
df = spark.createDataFrame(data, ["col1", "col2", "col3"])
df.show()
+----+----+----+
|col1|col2|col3|
+----+----+----+
| 1| A| 1|
| 1| B| 2|
| 1| C| 3|
| 1| A| 4|
| 1| A| 5|
| 1| C| 6|
| 2| X| 7|
| 2| X| 8|
+----+----+----+
# define your function - pure Python here, no Spark needed
def dummy_f(xs):
return sum(xs)
# apply your function as UDF - needs input function and return type (integer here)
(
df
.groupBy(F.col("col1"), F.col("col2"))
.agg(F.collect_list(F.col("col3").cast("int")).alias("col3"))
.withColumn("col3sum", F.udf(dummy_f, IntegerType())(F.col("col3")))
).show()
+----+----+---------+-------+
|col1|col2| col3|col3sum|
+----+----+---------+-------+
| 1| A|[1, 4, 5]| 10|
| 1| B| [2]| 2|
| 1| C| [3, 6]| 9|
| 2| X| [7, 8]| 15|
+----+----+---------+-------+
根据输入函数的需要聚合列是关键。您可以使用 create_map
创建字典或 collect_list
如此处所示。