合并数据集的行并对某些合并的列应用自定义函数
Merged rows of Dataset and apply a custom function on some merged columns
输入数据
+-------+------+-----+----------+--------+------+-------------------+
|KEY_1 |KEY_2 |KEY_3|EPOCH |DATA_1 |DATA_N|IMAGES |
+-------+------+-----+----------+--------+------+-------------------+
|0000001|6KBBCY|AA |1611826286|51183688|......|[[1611826286, 796]]|
|0000001|6KBBCY|AA |2043826286|51183688|......|[[2043826286, 799]]|
|0000001|6KBBCY|AA |1999999999|51183688|......|[[1999999999, 700]]|
|0000002|777777|XX |1611826555|51183799|......|[[1611826555, 500]]|
+-------+------+-----+----------+--------+------+-------------------+
IMAGES
是 Image
的 Seq()
:
case class Image ( EPOCH: String, USE_CASE : String )
我想合并 table 中的数据,如下所示:
对于每个复合键 <KEY_1, KEY_2, KEY_3>
,合并 IMAGE
列并动态计算 EPOCH
列,作为从合并图像中提取的最小值。同一复合键的 DATA
列具有相同的值。上面的数据集将变为:
预期数据
+-------+------+-----+----------+--------+------+---------------------------------------------------------+
|KEY_1 |KEY_2 |KEY_3|EPOCH |DATA_1 |DATA_N|IMAGES |
+-------+------+-----+----------+--------+------+---------------------------------------------------------+
|0000001|6KBBCY|AA |1611826286|51183688|......|[[1611826286, 796], [2043826286, 799], [1999999999, 700]]|
|0000002|777777|XX |1611826555|51183799|......|[[1611826555, 500]] |
+-------+------+-----+----------+--------+------+---------------------------------------------------------+
我已经成功地合并了每个复合键的图像:
val inputRecords: Dataset[MyModel] = /* data initialisation*/
import org.apache.spark.sql.functions._
val mergedImages: DataFrame = inputRecords.groupBy($"KEY_1", $"KEY_2", $"KEY_3").agg(
collect_list($"IMAGES"(0)).as("IMAGES")
)
mergedImages.show(false)
中间结果
+-------+------+-----+----------------------------------------------------------+
|KEY_1 |KEY_2 |KEY_3|IMAGES |
+-------+------+-----+------+---------------------------------------------------+
|0000001|6KBBCY|AA |[[1611826286, 796], [2043826286, 799]], [1999999999, 700]]|
+-------+------+-----+----------------------------------------------------------+
现在我对下一步如何使用 Spark 的并行化优势有点困惑。我可以使用顺序逻辑:
For each composite key mergedImages:
* identify those in inputRecords
* merge them with a custom logic.
但这就是 Java 的思维方式。在 Spark 中有更好的方法吗?
这是获取问题中第二个 table 的方法:
import org.apache.spark.sql.functions._
val mergedImages = inputRecords.groupBy(
$"KEY_1", $"KEY_2", $"KEY_3"
).agg(
min($"EPOCH").as("EPOCH"),
(inputRecords.columns.filterNot(
Seq("EPOCH","IMAGES","KEY_1","KEY_2","KEY_3").contains(_)
).map(
x => first(col(x)).as(x)
) :+ collect_list($"IMAGES"(0)).as("IMAGES")): _*
)
聚合可以用明码写成:
.agg(
min($"EPOCH").as("EPOCH"),
first($"DATA_1").as("DATA_1"),
first($"DATA_2").as("DATA_2"),
...
first($"DATA_N").as("DATA_N"),
collect_list($"IMAGES"(0)).as("IMAGES")
)
输入数据
+-------+------+-----+----------+--------+------+-------------------+
|KEY_1 |KEY_2 |KEY_3|EPOCH |DATA_1 |DATA_N|IMAGES |
+-------+------+-----+----------+--------+------+-------------------+
|0000001|6KBBCY|AA |1611826286|51183688|......|[[1611826286, 796]]|
|0000001|6KBBCY|AA |2043826286|51183688|......|[[2043826286, 799]]|
|0000001|6KBBCY|AA |1999999999|51183688|......|[[1999999999, 700]]|
|0000002|777777|XX |1611826555|51183799|......|[[1611826555, 500]]|
+-------+------+-----+----------+--------+------+-------------------+
IMAGES
是 Image
的 Seq()
:
case class Image ( EPOCH: String, USE_CASE : String )
我想合并 table 中的数据,如下所示:
对于每个复合键 <KEY_1, KEY_2, KEY_3>
,合并 IMAGE
列并动态计算 EPOCH
列,作为从合并图像中提取的最小值。同一复合键的 DATA
列具有相同的值。上面的数据集将变为:
预期数据
+-------+------+-----+----------+--------+------+---------------------------------------------------------+
|KEY_1 |KEY_2 |KEY_3|EPOCH |DATA_1 |DATA_N|IMAGES |
+-------+------+-----+----------+--------+------+---------------------------------------------------------+
|0000001|6KBBCY|AA |1611826286|51183688|......|[[1611826286, 796], [2043826286, 799], [1999999999, 700]]|
|0000002|777777|XX |1611826555|51183799|......|[[1611826555, 500]] |
+-------+------+-----+----------+--------+------+---------------------------------------------------------+
我已经成功地合并了每个复合键的图像:
val inputRecords: Dataset[MyModel] = /* data initialisation*/
import org.apache.spark.sql.functions._
val mergedImages: DataFrame = inputRecords.groupBy($"KEY_1", $"KEY_2", $"KEY_3").agg(
collect_list($"IMAGES"(0)).as("IMAGES")
)
mergedImages.show(false)
中间结果
+-------+------+-----+----------------------------------------------------------+
|KEY_1 |KEY_2 |KEY_3|IMAGES |
+-------+------+-----+------+---------------------------------------------------+
|0000001|6KBBCY|AA |[[1611826286, 796], [2043826286, 799]], [1999999999, 700]]|
+-------+------+-----+----------------------------------------------------------+
现在我对下一步如何使用 Spark 的并行化优势有点困惑。我可以使用顺序逻辑:
For each composite key mergedImages:
* identify those in inputRecords
* merge them with a custom logic.
但这就是 Java 的思维方式。在 Spark 中有更好的方法吗?
这是获取问题中第二个 table 的方法:
import org.apache.spark.sql.functions._
val mergedImages = inputRecords.groupBy(
$"KEY_1", $"KEY_2", $"KEY_3"
).agg(
min($"EPOCH").as("EPOCH"),
(inputRecords.columns.filterNot(
Seq("EPOCH","IMAGES","KEY_1","KEY_2","KEY_3").contains(_)
).map(
x => first(col(x)).as(x)
) :+ collect_list($"IMAGES"(0)).as("IMAGES")): _*
)
聚合可以用明码写成:
.agg(
min($"EPOCH").as("EPOCH"),
first($"DATA_1").as("DATA_1"),
first($"DATA_2").as("DATA_2"),
...
first($"DATA_N").as("DATA_N"),
collect_list($"IMAGES"(0)).as("IMAGES")
)