在没有明显分区列的情况下使用多分区的 Spark window
Using Spark window with more than one partition when there is no obvious partitioning column
这是场景。假设我有以下 table:
identifier
line
51169081604
2
00034886044
22
51168939455
52
挑战在于,对于每一列 line,select 下一个最大的列 line,我有通过以下 SQL:
完成
SELECT i1.line,i1.identifier,
MAX(i1.line) OVER (
ORDER BY i1.line ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING
)AS parent
FROM global_temp.documentIdentifiers i1
挑战部分解决好了,问题是,当我在 Spark 上执行这段代码时,性能很糟糕。警告信息说的很清楚:
没有为 Window 操作定义分区!将所有数据移动到单个分区,这会导致严重的性能下降。
按这两个字段中的任何一个进行分区都不起作用,它会破坏结果,当然,因为每个创建的分区都不知道其他行。
有没有人知道如何在没有性能问题的情况下“select 下一个最大的列 行”?
谢谢
使用您的“下一个”方法并假设数据是按行升序生成的,以下确实可以并行工作,但如果实际上更快,您可以告诉我;我不知道你的数据量。在任何情况下你都不能只用 SQL (%sql).
来解决
这里是:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
import spark.implicits._
case class X(identifier: Long, line: Long) // Too hard to explain, just gets around issues with df --> rdd --> df.
// Gen some more data.
val df = Seq(
(1000000, 23), (1200, 56), (1201, 58), (1202, 60),
(8200, 63), (890000, 67), (990000, 99), (33000, 123),
(33001, 124), (33002, 126), (33009, 132), (33019, 133),
(33029, 134), (33039, 135), (800, 201), (1800, 999),
(1801, 1999), (1802, 2999), (1800444, 9999)
).toDF("identifier", "line")
// Add partition so as to be able to apply parallelism - except for upper boundary record.
val df2 = df.as[X]
.rdd
.mapPartitionsWithIndex((index, iter) => {
iter.map(x => (index, x ))
}).mapValues(v => (v.identifier, v.line)).map(x => (x._1, x._2._1, x._2._2))
.toDF("part", "identifier", "line")
// Process per partition.
@transient val w = org.apache.spark.sql.expressions.Window.partitionBy("part").orderBy("line")
val df3 = df2.withColumn("next", lead("line", 1, null).over(w))
// Process upper boundary.
val df4 = df3.filter(df3("part") =!= 0).groupBy("part").agg(min("line").as("nxt")).toDF("pt", "nxt")
val df5 = df3.join(df4, (df3("part") === df4("pt") - 1), "outer" )
val df6 = df5.withColumn("next", when(col("next").isNull, col("nxt")).otherwise(col("next"))).select("identifier", "line", "next")
// Display. Sort accordingly.
df6.show(false)
returns:
+----------+----+----+
|identifier|line|next|
+----------+----+----+
|1000000 |23 |56 |
|1200 |56 |58 |
|1201 |58 |60 |
|1202 |60 |63 |
|8200 |63 |67 |
|890000 |67 |99 |
|990000 |99 |123 |
|33000 |123 |124 |
|33001 |124 |126 |
|33002 |126 |132 |
|33009 |132 |133 |
|33019 |133 |134 |
|33029 |134 |135 |
|33039 |135 |201 |
|800 |201 |999 |
|1800 |999 |1999|
|1801 |1999|2999|
|1802 |2999|9999|
|1800444 |9999|null|
+----------+----+----+
您可以添加额外的排序等。添加分区索引时依赖于窄转换。你如何加载可能是一个问题。不考虑缓存。
如果数据未按上述顺序排列,则需要先进行范围分区。
这是场景。假设我有以下 table:
identifier | line |
---|---|
51169081604 | 2 |
00034886044 | 22 |
51168939455 | 52 |
挑战在于,对于每一列 line,select 下一个最大的列 line,我有通过以下 SQL:
完成SELECT i1.line,i1.identifier,
MAX(i1.line) OVER (
ORDER BY i1.line ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING
)AS parent
FROM global_temp.documentIdentifiers i1
挑战部分解决好了,问题是,当我在 Spark 上执行这段代码时,性能很糟糕。警告信息说的很清楚:
没有为 Window 操作定义分区!将所有数据移动到单个分区,这会导致严重的性能下降。
按这两个字段中的任何一个进行分区都不起作用,它会破坏结果,当然,因为每个创建的分区都不知道其他行。
有没有人知道如何在没有性能问题的情况下“select 下一个最大的列 行”?
谢谢
使用您的“下一个”方法并假设数据是按行升序生成的,以下确实可以并行工作,但如果实际上更快,您可以告诉我;我不知道你的数据量。在任何情况下你都不能只用 SQL (%sql).
来解决这里是:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
import spark.implicits._
case class X(identifier: Long, line: Long) // Too hard to explain, just gets around issues with df --> rdd --> df.
// Gen some more data.
val df = Seq(
(1000000, 23), (1200, 56), (1201, 58), (1202, 60),
(8200, 63), (890000, 67), (990000, 99), (33000, 123),
(33001, 124), (33002, 126), (33009, 132), (33019, 133),
(33029, 134), (33039, 135), (800, 201), (1800, 999),
(1801, 1999), (1802, 2999), (1800444, 9999)
).toDF("identifier", "line")
// Add partition so as to be able to apply parallelism - except for upper boundary record.
val df2 = df.as[X]
.rdd
.mapPartitionsWithIndex((index, iter) => {
iter.map(x => (index, x ))
}).mapValues(v => (v.identifier, v.line)).map(x => (x._1, x._2._1, x._2._2))
.toDF("part", "identifier", "line")
// Process per partition.
@transient val w = org.apache.spark.sql.expressions.Window.partitionBy("part").orderBy("line")
val df3 = df2.withColumn("next", lead("line", 1, null).over(w))
// Process upper boundary.
val df4 = df3.filter(df3("part") =!= 0).groupBy("part").agg(min("line").as("nxt")).toDF("pt", "nxt")
val df5 = df3.join(df4, (df3("part") === df4("pt") - 1), "outer" )
val df6 = df5.withColumn("next", when(col("next").isNull, col("nxt")).otherwise(col("next"))).select("identifier", "line", "next")
// Display. Sort accordingly.
df6.show(false)
returns:
+----------+----+----+
|identifier|line|next|
+----------+----+----+
|1000000 |23 |56 |
|1200 |56 |58 |
|1201 |58 |60 |
|1202 |60 |63 |
|8200 |63 |67 |
|890000 |67 |99 |
|990000 |99 |123 |
|33000 |123 |124 |
|33001 |124 |126 |
|33002 |126 |132 |
|33009 |132 |133 |
|33019 |133 |134 |
|33029 |134 |135 |
|33039 |135 |201 |
|800 |201 |999 |
|1800 |999 |1999|
|1801 |1999|2999|
|1802 |2999|9999|
|1800444 |9999|null|
+----------+----+----+
您可以添加额外的排序等。添加分区索引时依赖于窄转换。你如何加载可能是一个问题。不考虑缓存。
如果数据未按上述顺序排列,则需要先进行范围分区。