PySpark 的第一个和最后一个函数一次完成一个分区
PySpark first and last function over a partition in one go
我有这样的 pyspark 代码,
spark_df = spark_df.orderBy('id', 'a1', 'c1')
out_df = spark_df.groupBy('id', 'a1', 'a2').agg(
F.first('c1').alias('c1'),
F.last('c2').alias('c2'),
F.first('c3').alias('c3'))
我需要让数据保持顺序id、a1和c1的顺序。然后 select 列,如上所示,覆盖在键 id、a1 和 c1 上定义的组。
由于第一个和最后一个不确定性,我将代码更改为这个看起来很丑的代码,但我不确定它是否有效。
w_first = Window.partitionBy('id', 'a1', 'a2').orderBy('c1')
w_last = Window.partitionBy('id', 'a1', 'a2').orderBy(F.desc('c1'))
out_first = spark_df.withColumn('Rank_First', F.rank().over(w_first)).filter(F.col('Rank_First') == 1).drop(
'Rank_First')
out_last = spark_df.withColumn('Rank_Last', F.rank().over(w_last)).filter(F.col('Rank_First') == 1).drop(
'Rank_Last')
out_first = out_first.withColumnRenamed('c1', 'First_c1') \
.withColumnRenamed('c2', 'First_c2') \
.withColumnRenamed('c3', 'First_c3')
out_last = out_last.withColumnRenamed('c1', 'Last_c1') \
.withColumnRenamed('c2', 'Last_c2') \
.withColumnRenamed('c3', 'Last_c3')
out_df = out_first.join(out_last, ['id', 'a1', 'a2']) \
.select('id', 'a1', 'a2', F.col('First_c1').alias('c1'),
F.col('Last_c2').alias('c2'),
F.col('First_c3').alias('c3'))
我正在尝试一种更好、更高效的替代方案。当数据量很大时,我 运行 会遇到性能瓶颈。
是否有更好的替代方法来一次性完成按特定顺序订购的 window。
将 orderBy
与 Window 一起使用时,您需要将帧边界指定为 ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
,否则 last
函数只会获取 UNBOUNDED PRECEDING
和 UNBOUNDED PRECEDING
之间的最后一个值CURRENT ROW
(指定排序依据时的默认帧边界)。
试试这个:
w = Window.partitionBy('id', 'a1', 'a2').orderBy('c1') \
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
df = df.withColumn("First_c1", first("c1").over(w)) \
.withColumn("First_c3", first("c3").over(w)) \
.withColumn("Last_c2", last("c2").over(w))
df.groupby("id", "a1", "a2")\
.agg(first("First_c1").alias("c1"),
first("Last_c2").alias("c2"),
first("First_c3").alias("c3")
).show()
我有这样的 pyspark 代码,
spark_df = spark_df.orderBy('id', 'a1', 'c1')
out_df = spark_df.groupBy('id', 'a1', 'a2').agg(
F.first('c1').alias('c1'),
F.last('c2').alias('c2'),
F.first('c3').alias('c3'))
我需要让数据保持顺序id、a1和c1的顺序。然后 select 列,如上所示,覆盖在键 id、a1 和 c1 上定义的组。
由于第一个和最后一个不确定性,我将代码更改为这个看起来很丑的代码,但我不确定它是否有效。
w_first = Window.partitionBy('id', 'a1', 'a2').orderBy('c1')
w_last = Window.partitionBy('id', 'a1', 'a2').orderBy(F.desc('c1'))
out_first = spark_df.withColumn('Rank_First', F.rank().over(w_first)).filter(F.col('Rank_First') == 1).drop(
'Rank_First')
out_last = spark_df.withColumn('Rank_Last', F.rank().over(w_last)).filter(F.col('Rank_First') == 1).drop(
'Rank_Last')
out_first = out_first.withColumnRenamed('c1', 'First_c1') \
.withColumnRenamed('c2', 'First_c2') \
.withColumnRenamed('c3', 'First_c3')
out_last = out_last.withColumnRenamed('c1', 'Last_c1') \
.withColumnRenamed('c2', 'Last_c2') \
.withColumnRenamed('c3', 'Last_c3')
out_df = out_first.join(out_last, ['id', 'a1', 'a2']) \
.select('id', 'a1', 'a2', F.col('First_c1').alias('c1'),
F.col('Last_c2').alias('c2'),
F.col('First_c3').alias('c3'))
我正在尝试一种更好、更高效的替代方案。当数据量很大时,我 运行 会遇到性能瓶颈。
是否有更好的替代方法来一次性完成按特定顺序订购的 window。
将 orderBy
与 Window 一起使用时,您需要将帧边界指定为 ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
,否则 last
函数只会获取 UNBOUNDED PRECEDING
和 UNBOUNDED PRECEDING
之间的最后一个值CURRENT ROW
(指定排序依据时的默认帧边界)。
试试这个:
w = Window.partitionBy('id', 'a1', 'a2').orderBy('c1') \
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
df = df.withColumn("First_c1", first("c1").over(w)) \
.withColumn("First_c3", first("c3").over(w)) \
.withColumn("Last_c2", last("c2").over(w))
df.groupby("id", "a1", "a2")\
.agg(first("First_c1").alias("c1"),
first("Last_c2").alias("c2"),
first("First_c3").alias("c3")
).show()