如何使用 Pyspark 生成增量 sub_id 不是唯一的
How to generate incremental sub_id not unique using Pyspark
我的目标是创建一个随机的 id 和一个增量的 sub_id 你会找到我的问题的更详细的解释下面。
初始数据帧
df = spark.createDataFrame([
(1, 110, 'aaa', 'walk', 'work'),
(2, 110, 'aaa', 'walk', 'work'),
(3, 110, 'aaa', 'bus', 'work'),
(4, 110, 'aaa', 'bus', 'work'),
(5, 110, 'aaa', 'walk','work'),
(6, 110, 'bbb', 'walk', 'home'),
(7, 110, 'bbb', 'bus', 'home'),
(8, 110, 'bbb', 'bus', 'home'),
(9, 110, 'bbb', 'walk', 'home')
],
['idx', 'u_uuid', 'p_uuid', 'mode', 'dest']
)
df.show()
+---+------+------+----+----+
|idx|u_uuid|p_uuid|mode|dest|
+---+------+------+----+----+
| 1| 110| aaa|walk|work|
| 2| 110| aaa|walk|work|
| 3| 110| aaa| bus|work|
| 4| 110| aaa| bus|work|
| 5| 110| aaa|walk|work|
| 6| 110| bbb|walk|home|
| 7| 110| bbb| bus|home|
| 8| 110| bbb| bus|home|
| 9| 110| bbb|walk|home|
+---+------+------+----+----+
要生成trip_id(可以是随机的)我用的列:
df_trip = df.withColumn("trip_id", F.rank().over(Window.orderBy('u_uuid', 'p_uuid', 'dest'))).sort('idx')
+---+------+------+----+----+-------+
|idx|u_uuid|p_uuid|mode|dest|trip_id|
+---+------+------+----+----+-------+
| 1| 110| aaa|walk|work| 1|
| 2| 110| aaa|walk|work| 1|
| 3| 110| aaa| bus|work| 1|
| 4| 110| aaa| bus|work| 1|
| 5| 110| aaa|walk|work| 1|
| 6| 110| bbb|walk|home| 6|
| 7| 110| bbb| bus|home| 6|
| 8| 110| bbb| bus|home| 6|
| 9| 110| bbb|walk|home| 6|
+---+------+------+----+----+-------+
要为每个 trip_id 生成 subtrip_id,我使用了:
df_subtrip = df_trip.withColumn("subtrip_id", F.row_number().over(Window.partitionBy(['p_uuid', 'u_uuid', 'dest', 'mode']).orderBy('idx')))
+---+------+------+----+----+-------+----------+
|idx|u_uuid|p_uuid|mode|dest|trip_id|subtrip_id|
+---+------+------+----+----+-------+----------+
| 1| 110| aaa|walk|work| 1| 122|
| 2| 110| aaa|walk|work| 1| 122|
| 3| 110| aaa| bus|work| 1| 123|
| 4| 110| aaa| bus|work| 1| 123|
| 5| 110| aaa|walk|work| 1| 124|
| 6| 110| bbb|walk|home| 6| 997|
| 7| 110| bbb| bus|home| 6| 998|
| 8| 110| bbb| bus|home| 6| 998|
| 9| 110| bbb|walk|home| 6| 999|
+---+------+------+----+----+-------+----------+
糟糕!!这不是我要找的,问题是我无法创建 sub_id incremntale like.
我在找什么:
+---+------+------+----+----+-------+----------+
|idx|u_uuid|p_uuid|mode|dest|trip_id|subtrip_id|
+---+------+------+----+----+-------+----------+
| 1| 110| aaa|walk|work| 1| 1|
| 2| 110| aaa|walk|work| 1| 1|
| 3| 110| aaa| bus|work| 1| 2|
| 4| 110| aaa| bus|work| 1| 2|
| 5| 110| aaa|walk|work| 1| 3|
| 6| 110| bbb|walk|home| 6| 1|
| 7| 110| bbb| bus|home| 6| 2|
| 8| 110| bbb| bus|home| 6| 2|
| 9| 110| bbb|walk|home| 6| 3|
+---+------+------+----+----+-------+----------+
您目前没有考虑 df_subtrip
语句中前一行的 mode
值,我还认为您的 trip_id 语句可能会导致 OOM 异常,因为您的所有数据被加载到一个分区。请查看下面的评论示例:
import sys
import pyspark.sql.functions as F
from pyspark.sql import Window
df = spark.createDataFrame([
(1, 110, 'aaa', 'walk', 'work'),
(2, 110, 'aaa', 'walk', 'work'),
(3, 110, 'aaa', 'bus', 'work'),
(4, 110, 'aaa', 'bus', 'work'),
(5, 110, 'aaa', 'walk','work'),
(6, 110, 'bbb', 'walk', 'home'),
(7, 110, 'bbb', 'bus', 'home'),
(8, 110, 'bbb', 'bus', 'home'),
(9, 110, 'bbb', 'walk', 'home')
],
['idx', 'u_uuid', 'p_uuid', 'mode', 'dest']
)
df.show()
#your trip_id statement will load all your data to one partition which isn't recommend and can cause OOM
#df_trip = df.withColumn("trip_id", F.rank().over(Window.orderBy('u_uuid', 'p_uuid', 'dest')))
#the following could(!) increase the performance
df = df.repartition('u_uuid', 'p_uuid', 'dest')
df_trip = df.withColumn("trip_id", F.spark_partition_id())
df_trip.show()
defaultW = Window.partitionBy('u_uuid', 'p_uuid', 'dest').orderBy('idx')
#mark the first row of each group with 1
df_subtrip = df_trip.withColumn("subtrip_id", F.when(df_trip.mode != F.lag(df_trip.mode, default='SOMETHING').over(defaultW), 1).otherwise(None))
#gives each first row of a group a row_number
df_subtrip = df_subtrip.withColumn("subtrip_id", F.when(df_subtrip.subtrip_id == 1 , F.row_number().over(Window.partitionBy('u_uuid', 'p_uuid', 'dest', 'subtrip_id' ).orderBy('idx'))).otherwise(None))
#forward-fill the empty subtrip_id's
df_subtrip = df_subtrip.withColumn('subtrip_id', F.last('subtrip_id', True).over(defaultW.rowsBetween(-sys.maxsize,0)))
df_subtrip.sort('idx').show()
输出:
+---+------+------+----+----+
|idx|u_uuid|p_uuid|mode|dest|
+---+------+------+----+----+
| 1| 110| aaa|walk|work|
| 2| 110| aaa|walk|work|
| 3| 110| aaa| bus|work|
| 4| 110| aaa| bus|work|
| 5| 110| aaa|walk|work|
| 6| 110| bbb|walk|home|
| 7| 110| bbb| bus|home|
| 8| 110| bbb| bus|home|
| 9| 110| bbb|walk|home|
+---+------+------+----+----+
+---+------+------+----+----+-------+
|idx|u_uuid|p_uuid|mode|dest|trip_id|
+---+------+------+----+----+-------+
| 5| 110| aaa|walk|work| 43|
| 1| 110| aaa|walk|work| 43|
| 2| 110| aaa|walk|work| 43|
| 3| 110| aaa| bus|work| 43|
| 4| 110| aaa| bus|work| 43|
| 6| 110| bbb|walk|home| 62|
| 7| 110| bbb| bus|home| 62|
| 8| 110| bbb| bus|home| 62|
| 9| 110| bbb|walk|home| 62|
+---+------+------+----+----+-------+
+---+------+------+----+----+-------+----------+
|idx|u_uuid|p_uuid|mode|dest|trip_id|subtrip_id|
+---+------+------+----+----+-------+----------+
| 1| 110| aaa|walk|work| 43| 1|
| 2| 110| aaa|walk|work| 43| 1|
| 3| 110| aaa| bus|work| 43| 2|
| 4| 110| aaa| bus|work| 43| 2|
| 5| 110| aaa|walk|work| 43| 3|
| 6| 110| bbb|walk|home| 62| 1|
| 7| 110| bbb| bus|home| 62| 2|
| 8| 110| bbb| bus|home| 62| 2|
| 9| 110| bbb|walk|home| 62| 3|
+---+------+------+----+----+-------+----------+
作为对 cronoik 响应的优化,您可以利用数据集上已有全局唯一 'idx' 列这一事实,并将 'trip_id' 和 'subtrip_id' 设置为 idx行程或子带开始的行。
这将使您通过单调递增的整数一次性生成 trip_id 和 subtrip_id。
由于您需要 subtrip_id 从 1 开始并以 1 为增量增加,因此您将第二次 window 通过 :
import sys
import pyspark.sql.functions as F
from pyspark.sql import Window
df = spark.createDataFrame([
(1, 110, 'aaa', 'walk', 'work'),
(2, 110, 'aaa', 'walk', 'work'),
(3, 110, 'aaa', 'bus', 'work'),
(4, 110, 'aaa', 'bus', 'work'),
(5, 110, 'aaa', 'walk','work'),
(6, 110, 'bbb', 'walk', 'home'),
(7, 110, 'bbb', 'bus', 'home'),
(8, 110, 'bbb', 'bus', 'home'),
(9, 110, 'bbb', 'walk', 'home')
],
['idx', 'u_uuid', 'p_uuid', 'mode', 'dest']
)
tripW = Window.partitionBy('u_uuid', 'p_uuid', 'dest').orderBy('idx')
df_final = df\
.withColumn("trip_id", F.first('idx').over(tripW))\
.withColumn('lagged_mode', F.lag('mode', default='').over(tripW))\
.withColumn('subtrip_id', (F.col('mode') != F.col('lagged_mode')).cast('int'))\
.withColumn('subtrip_id',F.sum('subtrip_id').over(tripW))\
.drop('lagged_mode')\
.sort('idx')
df_final.show()
+---+------+------+----+----+-------+----------+
|idx|u_uuid|p_uuid|mode|dest|trip_id|subtrip_id|
+---+------+------+----+----+-------+----------+
| 1| 110| aaa|walk|work| 1| 1|
| 2| 110| aaa|walk|work| 1| 1|
| 3| 110| aaa| bus|work| 1| 2|
| 4| 110| aaa| bus|work| 1| 2|
| 5| 110| aaa|walk|work| 1| 3|
| 6| 110| bbb|walk|home| 6| 1|
| 7| 110| bbb| bus|home| 6| 2|
| 8| 110| bbb| bus|home| 6| 2|
| 9| 110| bbb|walk|home| 6| 3|
+---+------+------+----+----+-------+----------+
如果您查看生成的计划,Spark 可以非常高效地构建它:
df_final.explain()
TakeOrderedAndProject(limit=21, orderBy=[idx#2861L ASC NULLS FIRST], output=[idx#12637,u_uuid#12638,p_uuid#2863,mode#2864,dest#2865,trip_id#12642,subtrip_id#12643])
+- *(3) Project [idx#2861L, u_uuid#2862L, p_uuid#2863, mode#2864, dest#2865, trip_id#12589L, subtrip_id#12614L]
+- Window [sum(cast(subtrip_id#12604 as bigint)) windowspecdefinition(u_uuid#2862L, p_uuid#2863, dest#2865, idx#2861L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS subtrip_id#12614L], [u_uuid#2862L, p_uuid#2863, dest#2865], [idx#2861L ASC NULLS FIRST]
+- *(2) Project [idx#2861L, u_uuid#2862L, p_uuid#2863, mode#2864, dest#2865, trip_id#12589L, cast(NOT (mode#2864 = lagged_mode#12596) as int) AS subtrip_id#12604]
+- Window [first(idx#2861L, false) windowspecdefinition(u_uuid#2862L, p_uuid#2863, dest#2865, idx#2861L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS trip_id#12589L, lag(mode#2864, 1, ) windowspecdefinition(u_uuid#2862L, p_uuid#2863, dest#2865, idx#2861L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS lagged_mode#12596], [u_uuid#2862L, p_uuid#2863, dest#2865], [idx#2861L ASC NULLS FIRST]
+- *(1) Sort [u_uuid#2862L ASC NULLS FIRST, p_uuid#2863 ASC NULLS FIRST, dest#2865 ASC NULLS FIRST, idx#2861L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(u_uuid#2862L, p_uuid#2863, dest#2865, 200)
+- Scan ExistingRDD[idx#2861L,u_uuid#2862L,p_uuid#2863,mode#2864,dest#2865]
cronoik 的解决方案使用不同的排序或 window 标准,因此将采用更昂贵的排序步骤和更长的执行时间:
df_subtrip.sort('idx').explain()
TakeOrderedAndProject(limit=21, orderBy=[idx#2861L ASC NULLS FIRST], output=[idx#2948,u_uuid#2949,p_uuid#2863,mode#2864,dest#2865,trip_id#2953,subtrip_id#2954])
+- *(4) Project [idx#2861L, u_uuid#2862L, p_uuid#2863, mode#2864, dest#2865, trip_id#2887, subtrip_id#2933]
+- Window [last(subtrip_id#2923, true) windowspecdefinition(u_uuid#2862L, p_uuid#2863, dest#2865, idx#2861L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS subtrip_id#2933], [u_uuid#2862L, p_uuid#2863, dest#2865], [idx#2861L ASC NULLS FIRST]
+- *(3) Sort [u_uuid#2862L ASC NULLS FIRST, p_uuid#2863 ASC NULLS FIRST, dest#2865 ASC NULLS FIRST, idx#2861L ASC NULLS FIRST], false, 0
+- *(3) Project [idx#2861L, u_uuid#2862L, p_uuid#2863, mode#2864, dest#2865, trip_id#2887, CASE WHEN (subtrip_id#2913 = 1) THEN _we0#2924 ELSE null END AS subtrip_id#2923]
+- Window [row_number() windowspecdefinition(u_uuid#2862L, p_uuid#2863, dest#2865, subtrip_id#2913, idx#2861L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _we0#2924], [u_uuid#2862L, p_uuid#2863, dest#2865, subtrip_id#2913], [idx#2861L ASC NULLS FIRST]
+- *(2) Sort [u_uuid#2862L ASC NULLS FIRST, p_uuid#2863 ASC NULLS FIRST, dest#2865 ASC NULLS FIRST, subtrip_id#2913 ASC NULLS FIRST, idx#2861L ASC NULLS FIRST], false, 0
+- *(2) Project [idx#2861L, u_uuid#2862L, p_uuid#2863, mode#2864, dest#2865, trip_id#2887, CASE WHEN NOT (mode#2864 = _we0#2914) THEN 1 ELSE null END AS subtrip_id#2913]
+- Window [lag(mode#2864, 1, SOMETHING) windowspecdefinition(u_uuid#2862L, p_uuid#2863, dest#2865, idx#2861L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS _we0#2914], [u_uuid#2862L, p_uuid#2863, dest#2865], [idx#2861L ASC NULLS FIRST]
+- *(1) Sort [u_uuid#2862L ASC NULLS FIRST, p_uuid#2863 ASC NULLS FIRST, dest#2865 ASC NULLS FIRST, idx#2861L ASC NULLS FIRST], false, 0
+- *(1) Project [idx#2861L, u_uuid#2862L, p_uuid#2863, mode#2864, dest#2865, SPARK_PARTITION_ID() AS trip_id#2887]
+- Exchange hashpartitioning(u_uuid#2862L, p_uuid#2863, dest#2865, 200)
+- Scan ExistingRDD[idx#2861L,u_uuid#2862L,p_uuid#2863,mode#2864,dest#2865]
我的目标是创建一个随机的 id 和一个增量的 sub_id 你会找到我的问题的更详细的解释下面。
初始数据帧
df = spark.createDataFrame([
(1, 110, 'aaa', 'walk', 'work'),
(2, 110, 'aaa', 'walk', 'work'),
(3, 110, 'aaa', 'bus', 'work'),
(4, 110, 'aaa', 'bus', 'work'),
(5, 110, 'aaa', 'walk','work'),
(6, 110, 'bbb', 'walk', 'home'),
(7, 110, 'bbb', 'bus', 'home'),
(8, 110, 'bbb', 'bus', 'home'),
(9, 110, 'bbb', 'walk', 'home')
],
['idx', 'u_uuid', 'p_uuid', 'mode', 'dest']
)
df.show()
+---+------+------+----+----+
|idx|u_uuid|p_uuid|mode|dest|
+---+------+------+----+----+
| 1| 110| aaa|walk|work|
| 2| 110| aaa|walk|work|
| 3| 110| aaa| bus|work|
| 4| 110| aaa| bus|work|
| 5| 110| aaa|walk|work|
| 6| 110| bbb|walk|home|
| 7| 110| bbb| bus|home|
| 8| 110| bbb| bus|home|
| 9| 110| bbb|walk|home|
+---+------+------+----+----+
要生成trip_id(可以是随机的)我用的列:
df_trip = df.withColumn("trip_id", F.rank().over(Window.orderBy('u_uuid', 'p_uuid', 'dest'))).sort('idx')
+---+------+------+----+----+-------+
|idx|u_uuid|p_uuid|mode|dest|trip_id|
+---+------+------+----+----+-------+
| 1| 110| aaa|walk|work| 1|
| 2| 110| aaa|walk|work| 1|
| 3| 110| aaa| bus|work| 1|
| 4| 110| aaa| bus|work| 1|
| 5| 110| aaa|walk|work| 1|
| 6| 110| bbb|walk|home| 6|
| 7| 110| bbb| bus|home| 6|
| 8| 110| bbb| bus|home| 6|
| 9| 110| bbb|walk|home| 6|
+---+------+------+----+----+-------+
要为每个 trip_id 生成 subtrip_id,我使用了:
df_subtrip = df_trip.withColumn("subtrip_id", F.row_number().over(Window.partitionBy(['p_uuid', 'u_uuid', 'dest', 'mode']).orderBy('idx')))
+---+------+------+----+----+-------+----------+
|idx|u_uuid|p_uuid|mode|dest|trip_id|subtrip_id|
+---+------+------+----+----+-------+----------+
| 1| 110| aaa|walk|work| 1| 122|
| 2| 110| aaa|walk|work| 1| 122|
| 3| 110| aaa| bus|work| 1| 123|
| 4| 110| aaa| bus|work| 1| 123|
| 5| 110| aaa|walk|work| 1| 124|
| 6| 110| bbb|walk|home| 6| 997|
| 7| 110| bbb| bus|home| 6| 998|
| 8| 110| bbb| bus|home| 6| 998|
| 9| 110| bbb|walk|home| 6| 999|
+---+------+------+----+----+-------+----------+
糟糕!!这不是我要找的,问题是我无法创建 sub_id incremntale like.
我在找什么:
+---+------+------+----+----+-------+----------+
|idx|u_uuid|p_uuid|mode|dest|trip_id|subtrip_id|
+---+------+------+----+----+-------+----------+
| 1| 110| aaa|walk|work| 1| 1|
| 2| 110| aaa|walk|work| 1| 1|
| 3| 110| aaa| bus|work| 1| 2|
| 4| 110| aaa| bus|work| 1| 2|
| 5| 110| aaa|walk|work| 1| 3|
| 6| 110| bbb|walk|home| 6| 1|
| 7| 110| bbb| bus|home| 6| 2|
| 8| 110| bbb| bus|home| 6| 2|
| 9| 110| bbb|walk|home| 6| 3|
+---+------+------+----+----+-------+----------+
您目前没有考虑 df_subtrip
语句中前一行的 mode
值,我还认为您的 trip_id 语句可能会导致 OOM 异常,因为您的所有数据被加载到一个分区。请查看下面的评论示例:
import sys
import pyspark.sql.functions as F
from pyspark.sql import Window
df = spark.createDataFrame([
(1, 110, 'aaa', 'walk', 'work'),
(2, 110, 'aaa', 'walk', 'work'),
(3, 110, 'aaa', 'bus', 'work'),
(4, 110, 'aaa', 'bus', 'work'),
(5, 110, 'aaa', 'walk','work'),
(6, 110, 'bbb', 'walk', 'home'),
(7, 110, 'bbb', 'bus', 'home'),
(8, 110, 'bbb', 'bus', 'home'),
(9, 110, 'bbb', 'walk', 'home')
],
['idx', 'u_uuid', 'p_uuid', 'mode', 'dest']
)
df.show()
#your trip_id statement will load all your data to one partition which isn't recommend and can cause OOM
#df_trip = df.withColumn("trip_id", F.rank().over(Window.orderBy('u_uuid', 'p_uuid', 'dest')))
#the following could(!) increase the performance
df = df.repartition('u_uuid', 'p_uuid', 'dest')
df_trip = df.withColumn("trip_id", F.spark_partition_id())
df_trip.show()
defaultW = Window.partitionBy('u_uuid', 'p_uuid', 'dest').orderBy('idx')
#mark the first row of each group with 1
df_subtrip = df_trip.withColumn("subtrip_id", F.when(df_trip.mode != F.lag(df_trip.mode, default='SOMETHING').over(defaultW), 1).otherwise(None))
#gives each first row of a group a row_number
df_subtrip = df_subtrip.withColumn("subtrip_id", F.when(df_subtrip.subtrip_id == 1 , F.row_number().over(Window.partitionBy('u_uuid', 'p_uuid', 'dest', 'subtrip_id' ).orderBy('idx'))).otherwise(None))
#forward-fill the empty subtrip_id's
df_subtrip = df_subtrip.withColumn('subtrip_id', F.last('subtrip_id', True).over(defaultW.rowsBetween(-sys.maxsize,0)))
df_subtrip.sort('idx').show()
输出:
+---+------+------+----+----+
|idx|u_uuid|p_uuid|mode|dest|
+---+------+------+----+----+
| 1| 110| aaa|walk|work|
| 2| 110| aaa|walk|work|
| 3| 110| aaa| bus|work|
| 4| 110| aaa| bus|work|
| 5| 110| aaa|walk|work|
| 6| 110| bbb|walk|home|
| 7| 110| bbb| bus|home|
| 8| 110| bbb| bus|home|
| 9| 110| bbb|walk|home|
+---+------+------+----+----+
+---+------+------+----+----+-------+
|idx|u_uuid|p_uuid|mode|dest|trip_id|
+---+------+------+----+----+-------+
| 5| 110| aaa|walk|work| 43|
| 1| 110| aaa|walk|work| 43|
| 2| 110| aaa|walk|work| 43|
| 3| 110| aaa| bus|work| 43|
| 4| 110| aaa| bus|work| 43|
| 6| 110| bbb|walk|home| 62|
| 7| 110| bbb| bus|home| 62|
| 8| 110| bbb| bus|home| 62|
| 9| 110| bbb|walk|home| 62|
+---+------+------+----+----+-------+
+---+------+------+----+----+-------+----------+
|idx|u_uuid|p_uuid|mode|dest|trip_id|subtrip_id|
+---+------+------+----+----+-------+----------+
| 1| 110| aaa|walk|work| 43| 1|
| 2| 110| aaa|walk|work| 43| 1|
| 3| 110| aaa| bus|work| 43| 2|
| 4| 110| aaa| bus|work| 43| 2|
| 5| 110| aaa|walk|work| 43| 3|
| 6| 110| bbb|walk|home| 62| 1|
| 7| 110| bbb| bus|home| 62| 2|
| 8| 110| bbb| bus|home| 62| 2|
| 9| 110| bbb|walk|home| 62| 3|
+---+------+------+----+----+-------+----------+
作为对 cronoik 响应的优化,您可以利用数据集上已有全局唯一 'idx' 列这一事实,并将 'trip_id' 和 'subtrip_id' 设置为 idx行程或子带开始的行。
这将使您通过单调递增的整数一次性生成 trip_id 和 subtrip_id。 由于您需要 subtrip_id 从 1 开始并以 1 为增量增加,因此您将第二次 window 通过 :
import sys
import pyspark.sql.functions as F
from pyspark.sql import Window
df = spark.createDataFrame([
(1, 110, 'aaa', 'walk', 'work'),
(2, 110, 'aaa', 'walk', 'work'),
(3, 110, 'aaa', 'bus', 'work'),
(4, 110, 'aaa', 'bus', 'work'),
(5, 110, 'aaa', 'walk','work'),
(6, 110, 'bbb', 'walk', 'home'),
(7, 110, 'bbb', 'bus', 'home'),
(8, 110, 'bbb', 'bus', 'home'),
(9, 110, 'bbb', 'walk', 'home')
],
['idx', 'u_uuid', 'p_uuid', 'mode', 'dest']
)
tripW = Window.partitionBy('u_uuid', 'p_uuid', 'dest').orderBy('idx')
df_final = df\
.withColumn("trip_id", F.first('idx').over(tripW))\
.withColumn('lagged_mode', F.lag('mode', default='').over(tripW))\
.withColumn('subtrip_id', (F.col('mode') != F.col('lagged_mode')).cast('int'))\
.withColumn('subtrip_id',F.sum('subtrip_id').over(tripW))\
.drop('lagged_mode')\
.sort('idx')
df_final.show()
+---+------+------+----+----+-------+----------+
|idx|u_uuid|p_uuid|mode|dest|trip_id|subtrip_id|
+---+------+------+----+----+-------+----------+
| 1| 110| aaa|walk|work| 1| 1|
| 2| 110| aaa|walk|work| 1| 1|
| 3| 110| aaa| bus|work| 1| 2|
| 4| 110| aaa| bus|work| 1| 2|
| 5| 110| aaa|walk|work| 1| 3|
| 6| 110| bbb|walk|home| 6| 1|
| 7| 110| bbb| bus|home| 6| 2|
| 8| 110| bbb| bus|home| 6| 2|
| 9| 110| bbb|walk|home| 6| 3|
+---+------+------+----+----+-------+----------+
如果您查看生成的计划,Spark 可以非常高效地构建它:
df_final.explain()
TakeOrderedAndProject(limit=21, orderBy=[idx#2861L ASC NULLS FIRST], output=[idx#12637,u_uuid#12638,p_uuid#2863,mode#2864,dest#2865,trip_id#12642,subtrip_id#12643])
+- *(3) Project [idx#2861L, u_uuid#2862L, p_uuid#2863, mode#2864, dest#2865, trip_id#12589L, subtrip_id#12614L]
+- Window [sum(cast(subtrip_id#12604 as bigint)) windowspecdefinition(u_uuid#2862L, p_uuid#2863, dest#2865, idx#2861L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS subtrip_id#12614L], [u_uuid#2862L, p_uuid#2863, dest#2865], [idx#2861L ASC NULLS FIRST]
+- *(2) Project [idx#2861L, u_uuid#2862L, p_uuid#2863, mode#2864, dest#2865, trip_id#12589L, cast(NOT (mode#2864 = lagged_mode#12596) as int) AS subtrip_id#12604]
+- Window [first(idx#2861L, false) windowspecdefinition(u_uuid#2862L, p_uuid#2863, dest#2865, idx#2861L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS trip_id#12589L, lag(mode#2864, 1, ) windowspecdefinition(u_uuid#2862L, p_uuid#2863, dest#2865, idx#2861L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS lagged_mode#12596], [u_uuid#2862L, p_uuid#2863, dest#2865], [idx#2861L ASC NULLS FIRST]
+- *(1) Sort [u_uuid#2862L ASC NULLS FIRST, p_uuid#2863 ASC NULLS FIRST, dest#2865 ASC NULLS FIRST, idx#2861L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(u_uuid#2862L, p_uuid#2863, dest#2865, 200)
+- Scan ExistingRDD[idx#2861L,u_uuid#2862L,p_uuid#2863,mode#2864,dest#2865]
cronoik 的解决方案使用不同的排序或 window 标准,因此将采用更昂贵的排序步骤和更长的执行时间:
df_subtrip.sort('idx').explain()
TakeOrderedAndProject(limit=21, orderBy=[idx#2861L ASC NULLS FIRST], output=[idx#2948,u_uuid#2949,p_uuid#2863,mode#2864,dest#2865,trip_id#2953,subtrip_id#2954])
+- *(4) Project [idx#2861L, u_uuid#2862L, p_uuid#2863, mode#2864, dest#2865, trip_id#2887, subtrip_id#2933]
+- Window [last(subtrip_id#2923, true) windowspecdefinition(u_uuid#2862L, p_uuid#2863, dest#2865, idx#2861L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS subtrip_id#2933], [u_uuid#2862L, p_uuid#2863, dest#2865], [idx#2861L ASC NULLS FIRST]
+- *(3) Sort [u_uuid#2862L ASC NULLS FIRST, p_uuid#2863 ASC NULLS FIRST, dest#2865 ASC NULLS FIRST, idx#2861L ASC NULLS FIRST], false, 0
+- *(3) Project [idx#2861L, u_uuid#2862L, p_uuid#2863, mode#2864, dest#2865, trip_id#2887, CASE WHEN (subtrip_id#2913 = 1) THEN _we0#2924 ELSE null END AS subtrip_id#2923]
+- Window [row_number() windowspecdefinition(u_uuid#2862L, p_uuid#2863, dest#2865, subtrip_id#2913, idx#2861L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _we0#2924], [u_uuid#2862L, p_uuid#2863, dest#2865, subtrip_id#2913], [idx#2861L ASC NULLS FIRST]
+- *(2) Sort [u_uuid#2862L ASC NULLS FIRST, p_uuid#2863 ASC NULLS FIRST, dest#2865 ASC NULLS FIRST, subtrip_id#2913 ASC NULLS FIRST, idx#2861L ASC NULLS FIRST], false, 0
+- *(2) Project [idx#2861L, u_uuid#2862L, p_uuid#2863, mode#2864, dest#2865, trip_id#2887, CASE WHEN NOT (mode#2864 = _we0#2914) THEN 1 ELSE null END AS subtrip_id#2913]
+- Window [lag(mode#2864, 1, SOMETHING) windowspecdefinition(u_uuid#2862L, p_uuid#2863, dest#2865, idx#2861L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS _we0#2914], [u_uuid#2862L, p_uuid#2863, dest#2865], [idx#2861L ASC NULLS FIRST]
+- *(1) Sort [u_uuid#2862L ASC NULLS FIRST, p_uuid#2863 ASC NULLS FIRST, dest#2865 ASC NULLS FIRST, idx#2861L ASC NULLS FIRST], false, 0
+- *(1) Project [idx#2861L, u_uuid#2862L, p_uuid#2863, mode#2864, dest#2865, SPARK_PARTITION_ID() AS trip_id#2887]
+- Exchange hashpartitioning(u_uuid#2862L, p_uuid#2863, dest#2865, 200)
+- Scan ExistingRDD[idx#2861L,u_uuid#2862L,p_uuid#2863,mode#2864,dest#2865]