合并数据集的行并对某些合并的列应用自定义函数

Merged rows of Dataset and apply a custom function on some merged columns

输入数据

+-------+------+-----+----------+--------+------+-------------------+
|KEY_1  |KEY_2 |KEY_3|EPOCH     |DATA_1  |DATA_N|IMAGES             |
+-------+------+-----+----------+--------+------+-------------------+
|0000001|6KBBCY|AA   |1611826286|51183688|......|[[1611826286, 796]]|
|0000001|6KBBCY|AA   |2043826286|51183688|......|[[2043826286, 799]]|
|0000001|6KBBCY|AA   |1999999999|51183688|......|[[1999999999, 700]]|
|0000002|777777|XX   |1611826555|51183799|......|[[1611826555, 500]]|
+-------+------+-----+----------+--------+------+-------------------+

IMAGESImageSeq():

case class Image ( EPOCH: String, USE_CASE : String )

我想合并 table 中的数据,如下所示: 对于每个复合键 <KEY_1, KEY_2, KEY_3>,合并 IMAGE 列并动态计算 EPOCH 列,作为从合并图像中提取的最小值。同一复合键的 DATA 列具有相同的值。上面的数据集将变为:

预期数据

+-------+------+-----+----------+--------+------+---------------------------------------------------------+
|KEY_1  |KEY_2 |KEY_3|EPOCH     |DATA_1  |DATA_N|IMAGES                                                   |
+-------+------+-----+----------+--------+------+---------------------------------------------------------+
|0000001|6KBBCY|AA   |1611826286|51183688|......|[[1611826286, 796], [2043826286, 799], [1999999999, 700]]|
|0000002|777777|XX   |1611826555|51183799|......|[[1611826555, 500]]                                      |
+-------+------+-----+----------+--------+------+---------------------------------------------------------+

我已经成功地合并了每个复合键的图像:

val inputRecords: Dataset[MyModel] = /* data initialisation*/

import org.apache.spark.sql.functions._
val mergedImages: DataFrame = inputRecords.groupBy($"KEY_1", $"KEY_2", $"KEY_3").agg(
    collect_list($"IMAGES"(0)).as("IMAGES")
  )
mergedImages.show(false)

中间结果

+-------+------+-----+----------------------------------------------------------+
|KEY_1  |KEY_2 |KEY_3|IMAGES                                                    |
+-------+------+-----+------+---------------------------------------------------+
|0000001|6KBBCY|AA   |[[1611826286, 796], [2043826286, 799]], [1999999999, 700]]|
+-------+------+-----+----------------------------------------------------------+

现在我对下一步如何使用 Spark 的并行化优势有点困惑。我可以使用顺序逻辑:

For each composite key mergedImages:
  * identify those in inputRecords 
  *  merge them with a custom logic.

但这就是 Java 的思维方式。在 Spark 中有更好的方法吗?

这是获取问题中第二个 table 的方法:

import org.apache.spark.sql.functions._

val mergedImages = inputRecords.groupBy(
    $"KEY_1", $"KEY_2", $"KEY_3"
).agg(
    min($"EPOCH").as("EPOCH"),
    (inputRecords.columns.filterNot(
        Seq("EPOCH","IMAGES","KEY_1","KEY_2","KEY_3").contains(_)
    ).map(
        x => first(col(x)).as(x)
    ) :+ collect_list($"IMAGES"(0)).as("IMAGES")): _*
)

聚合可以用明码写成:

.agg(
    min($"EPOCH").as("EPOCH"),
    first($"DATA_1").as("DATA_1"),
    first($"DATA_2").as("DATA_2"),
    ...
    first($"DATA_N").as("DATA_N"),
    collect_list($"IMAGES"(0)).as("IMAGES")
)