在 scala 中处理来自 spark 数据集的行组

processing group of rows from spark dataset in scala

我正在寻找一种方法将我的大型 spark 数据集划分为 groups/batches 并在某些函数中处理该组行。所以基本上应该将一组行输入到我的函数中,输出对我来说是单位,因为我不想聚合或更新输入记录,而只是执行一些计算。

为了理解,假设我有以下输入。

Col1 Col2 Col3
1 A 1
1 B 2
1 C 3
1 A 4
1 A 5
2 C 6
2 X 7
2 X 8

假设我需要按 col1 和 col2 分组,这将给我以下分组

(1,A,1), (1,A,4), (1,A,5) ---> 第一组

(1, B, 2) ---> 第二组

(1, C, 3), (1, C, 6)---> 第三组

(2, X, 7), (2, X, 8) ---> 第 4 组

所以我想将这些组传递给我的函数以执行一些逻辑。现在,假设我在该方法中对 Col3 求和。(这不是我的要求,但假设我想在我的单独方法中进行求和)。生成以下 o/p.

Col1 Col2 Col3
1 A 10
1 B 2
1 C 9
2 X 15

我怎样才能做到这一点,根据一些建议,我试图查看 UDAF 但找不到如何使用它的方法。请注意,我的真实输入数据集有超过 5 亿条记录。谢谢

这里是一个基于您的输入的简单示例,可帮助您入门:

    from pyspark.sql.types import IntegerType
    from pyspark.sql import SparkSession
    from pyspark.sql import functions as F
    
    spark = SparkSession.builder.getOrCreate()
    
    data = [
        (1, "A",  1),
        (1, "B",  2),
        (1, "C",  3),
        (1, "A",  4),
        (1, "A",  5),
        (1, "C",  6),
        (2, "X",  7),
        (2, "X",  8),
    ]
    
    df = spark.createDataFrame(data, ["col1", "col2", "col3"])
    df.show()
    
    +----+----+----+
    |col1|col2|col3|
    +----+----+----+
    |   1|   A|   1|
    |   1|   B|   2|
    |   1|   C|   3|
    |   1|   A|   4|
    |   1|   A|   5|
    |   1|   C|   6|
    |   2|   X|   7|
    |   2|   X|   8|
    +----+----+----+
    
    # define your function - pure Python here, no Spark needed
    def dummy_f(xs):
      return sum(xs)
    
    
    # apply your function as UDF - needs input function and return type (integer here)
    (
      df
      .groupBy(F.col("col1"), F.col("col2"))
      .agg(F.collect_list(F.col("col3").cast("int")).alias("col3"))
      .withColumn("col3sum", F.udf(dummy_f, IntegerType())(F.col("col3")))
    ).show()
    
    +----+----+---------+-------+
    |col1|col2|     col3|col3sum|
    +----+----+---------+-------+
    |   1|   A|[1, 4, 5]|     10|
    |   1|   B|      [2]|      2|
    |   1|   C|   [3, 6]|      9|
    |   2|   X|   [7, 8]|     15|
    +----+----+---------+-------+

根据输入函数的需要聚合列是关键。您可以使用 create_map 创建字典或 collect_list 如此处所示。