在没有明显分区列的情况下使用多分区的 Spark window

Using Spark window with more than one partition when there is no obvious partitioning column

这是场景。假设我有以下 table:

identifier line
51169081604 2
00034886044 22
51168939455 52

挑战在于,对于每一列 line,select 下一个最大的列 line,我有通过以下 SQL:

完成
SELECT i1.line,i1.identifier, 
MAX(i1.line) OVER (
    ORDER BY i1.line ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING
)AS parent
FROM global_temp.documentIdentifiers i1

挑战部分解决好了,问题是,当我在 Spark 上执行这段代码时,性能很糟糕。警告信息说的很清楚:

没有为 Window 操作定义分区!将所有数据移动到单个分区,这会导致严重的性能下降。

按这两个字段中的任何一个进行分区都不起作用,它会破坏结果,当然,因为每个创建的分区都不知道其他行。

有没有人知道如何在没有性能问题的情况下“select 下一个最大的列 ”?

谢谢

使用您的“下一个”方法并假设数据是按行升序生成的,以下确实可以并行工作,但如果实际上更快,您可以告诉我;我不知道你的数据量。在任何情况下你都不能只用 SQL (%sql).

来解决

这里是:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
import spark.implicits._

case class X(identifier: Long, line: Long) // Too hard to explain, just gets around issues with df --> rdd --> df.

// Gen some more data.
val df = Seq(
 (1000000, 23), (1200, 56), (1201, 58), (1202, 60),
 (8200, 63), (890000, 67), (990000, 99), (33000, 123),
 (33001, 124), (33002, 126), (33009, 132), (33019, 133),
 (33029, 134), (33039, 135), (800, 201), (1800, 999),
 (1801, 1999), (1802, 2999), (1800444, 9999)
 ).toDF("identifier", "line")

// Add partition so as to be able to apply parallelism - except for upper boundary record.
val df2 = df.as[X]
            .rdd
            .mapPartitionsWithIndex((index, iter) => {
                iter.map(x => (index, x ))   
             }).mapValues(v => (v.identifier, v.line)).map(x => (x._1, x._2._1, x._2._2))
            .toDF("part", "identifier", "line")

// Process per partition.
@transient val w = org.apache.spark.sql.expressions.Window.partitionBy("part").orderBy("line")  
val df3 = df2.withColumn("next", lead("line", 1, null).over(w))

// Process upper boundary.
val df4 = df3.filter(df3("part") =!= 0).groupBy("part").agg(min("line").as("nxt")).toDF("pt", "nxt")
val df5 = df3.join(df4, (df3("part") === df4("pt") - 1), "outer" )
val df6 = df5.withColumn("next", when(col("next").isNull, col("nxt")).otherwise(col("next"))).select("identifier", "line", "next")

// Display. Sort accordingly.
df6.show(false)

returns:

+----------+----+----+
|identifier|line|next|
+----------+----+----+
|1000000   |23  |56  |
|1200      |56  |58  |
|1201      |58  |60  |
|1202      |60  |63  |
|8200      |63  |67  |
|890000    |67  |99  |
|990000    |99  |123 |
|33000     |123 |124 |
|33001     |124 |126 |
|33002     |126 |132 |
|33009     |132 |133 |
|33019     |133 |134 |
|33029     |134 |135 |
|33039     |135 |201 |
|800       |201 |999 |
|1800      |999 |1999|
|1801      |1999|2999|
|1802      |2999|9999|
|1800444   |9999|null|
+----------+----+----+

您可以添加额外的排序等。添加分区索引时依赖于窄转换。你如何加载可能是一个问题。不考虑缓存。

如果数据未按上述顺序排列,则需要先进行范围分区。