如何使用 Pyspark 生成增量 sub_id 不是唯一的

How to generate incremental sub_id not unique using Pyspark

我的目标是创建一个随机的 id 和一个增量的 sub_id 你会找到我的问题的更详细的解释下面。

初始数据帧

df = spark.createDataFrame([
                            (1, 110, 'aaa', 'walk', 'work'),
                            (2, 110, 'aaa', 'walk', 'work'),
                            (3, 110, 'aaa', 'bus', 'work'),
                            (4, 110, 'aaa', 'bus', 'work'),
                            (5, 110, 'aaa', 'walk','work'),
                            (6, 110, 'bbb', 'walk', 'home'),
                            (7, 110, 'bbb', 'bus', 'home'),
                            (8, 110, 'bbb', 'bus',  'home'),
                            (9, 110, 'bbb', 'walk', 'home')
                        ],
                        ['idx', 'u_uuid', 'p_uuid', 'mode', 'dest']
                    )

df.show()

+---+------+------+----+----+
|idx|u_uuid|p_uuid|mode|dest|
+---+------+------+----+----+
|  1|   110|   aaa|walk|work|
|  2|   110|   aaa|walk|work|
|  3|   110|   aaa| bus|work|
|  4|   110|   aaa| bus|work|
|  5|   110|   aaa|walk|work|
|  6|   110|   bbb|walk|home|
|  7|   110|   bbb| bus|home|
|  8|   110|   bbb| bus|home|
|  9|   110|   bbb|walk|home|
+---+------+------+----+----+

要生成trip_id(可以是随机的)我用的列:

df_trip = df.withColumn("trip_id", F.rank().over(Window.orderBy('u_uuid', 'p_uuid', 'dest'))).sort('idx')

+---+------+------+----+----+-------+
|idx|u_uuid|p_uuid|mode|dest|trip_id|
+---+------+------+----+----+-------+
|  1|   110|   aaa|walk|work|      1|
|  2|   110|   aaa|walk|work|      1|
|  3|   110|   aaa| bus|work|      1|
|  4|   110|   aaa| bus|work|      1|
|  5|   110|   aaa|walk|work|      1|
|  6|   110|   bbb|walk|home|      6|
|  7|   110|   bbb| bus|home|      6|
|  8|   110|   bbb| bus|home|      6|
|  9|   110|   bbb|walk|home|      6|
+---+------+------+----+----+-------+

要为每个 trip_id 生成 subtrip_id,我使用了:

df_subtrip = df_trip.withColumn("subtrip_id", F.row_number().over(Window.partitionBy(['p_uuid', 'u_uuid', 'dest', 'mode']).orderBy('idx')))

+---+------+------+----+----+-------+----------+
|idx|u_uuid|p_uuid|mode|dest|trip_id|subtrip_id|
+---+------+------+----+----+-------+----------+
|  1|   110|   aaa|walk|work|      1|       122|
|  2|   110|   aaa|walk|work|      1|       122|
|  3|   110|   aaa| bus|work|      1|       123|
|  4|   110|   aaa| bus|work|      1|       123|
|  5|   110|   aaa|walk|work|      1|       124|
|  6|   110|   bbb|walk|home|      6|       997|
|  7|   110|   bbb| bus|home|      6|       998|
|  8|   110|   bbb| bus|home|      6|       998|
|  9|   110|   bbb|walk|home|      6|       999|
+---+------+------+----+----+-------+----------+

糟糕!!这不是我要找的,问题是我无法创建 sub_id incremntale like.

我在找什么:

+---+------+------+----+----+-------+----------+
|idx|u_uuid|p_uuid|mode|dest|trip_id|subtrip_id|
+---+------+------+----+----+-------+----------+
|  1|   110|   aaa|walk|work|      1|         1|
|  2|   110|   aaa|walk|work|      1|         1|
|  3|   110|   aaa| bus|work|      1|         2|
|  4|   110|   aaa| bus|work|      1|         2|
|  5|   110|   aaa|walk|work|      1|         3|
|  6|   110|   bbb|walk|home|      6|         1|
|  7|   110|   bbb| bus|home|      6|         2|
|  8|   110|   bbb| bus|home|      6|         2|
|  9|   110|   bbb|walk|home|      6|         3|
+---+------+------+----+----+-------+----------+

您目前没有考虑 df_subtrip 语句中前一行的 mode 值,我还认为您的 trip_id 语句可能会导致 OOM 异常,因为您的所有数据被加载到一个分区。请查看下面的评论示例:

import sys
import pyspark.sql.functions as F
from pyspark.sql import Window

df = spark.createDataFrame([
                            (1, 110, 'aaa', 'walk', 'work'),
                            (2, 110, 'aaa', 'walk', 'work'),
                            (3, 110, 'aaa', 'bus', 'work'),
                            (4, 110, 'aaa', 'bus', 'work'),
                            (5, 110, 'aaa', 'walk','work'),
                            (6, 110, 'bbb', 'walk', 'home'),
                            (7, 110, 'bbb', 'bus', 'home'),
                            (8, 110, 'bbb', 'bus',  'home'),
                            (9, 110, 'bbb', 'walk', 'home')
                        ],
                        ['idx', 'u_uuid', 'p_uuid', 'mode', 'dest']
                    )

df.show()

#your trip_id statement will load all your data to one partition which isn't recommend and can cause OOM
#df_trip = df.withColumn("trip_id", F.rank().over(Window.orderBy('u_uuid', 'p_uuid', 'dest')))
#the following could(!) increase the performance
df = df.repartition('u_uuid', 'p_uuid', 'dest')
df_trip = df.withColumn("trip_id", F.spark_partition_id())

df_trip.show()

defaultW = Window.partitionBy('u_uuid', 'p_uuid', 'dest').orderBy('idx')

#mark the first row of each group with 1
df_subtrip = df_trip.withColumn("subtrip_id", F.when(df_trip.mode != F.lag(df_trip.mode, default='SOMETHING').over(defaultW), 1).otherwise(None))

#gives each first row of a group a row_number
df_subtrip = df_subtrip.withColumn("subtrip_id", F.when(df_subtrip.subtrip_id == 1 , F.row_number().over(Window.partitionBy('u_uuid', 'p_uuid', 'dest', 'subtrip_id' ).orderBy('idx'))).otherwise(None))

#forward-fill the empty subtrip_id's
df_subtrip = df_subtrip.withColumn('subtrip_id', F.last('subtrip_id', True).over(defaultW.rowsBetween(-sys.maxsize,0)))

df_subtrip.sort('idx').show()

输出:

+---+------+------+----+----+
|idx|u_uuid|p_uuid|mode|dest|
+---+------+------+----+----+
|  1|   110|   aaa|walk|work|
|  2|   110|   aaa|walk|work|
|  3|   110|   aaa| bus|work|
|  4|   110|   aaa| bus|work|
|  5|   110|   aaa|walk|work|
|  6|   110|   bbb|walk|home|
|  7|   110|   bbb| bus|home|
|  8|   110|   bbb| bus|home|
|  9|   110|   bbb|walk|home|
+---+------+------+----+----+

+---+------+------+----+----+-------+
|idx|u_uuid|p_uuid|mode|dest|trip_id|
+---+------+------+----+----+-------+
|  5|   110|   aaa|walk|work|     43|
|  1|   110|   aaa|walk|work|     43|
|  2|   110|   aaa|walk|work|     43|
|  3|   110|   aaa| bus|work|     43|
|  4|   110|   aaa| bus|work|     43|
|  6|   110|   bbb|walk|home|     62|
|  7|   110|   bbb| bus|home|     62|
|  8|   110|   bbb| bus|home|     62|
|  9|   110|   bbb|walk|home|     62|
+---+------+------+----+----+-------+

+---+------+------+----+----+-------+----------+
|idx|u_uuid|p_uuid|mode|dest|trip_id|subtrip_id|
+---+------+------+----+----+-------+----------+
|  1|   110|   aaa|walk|work|     43|         1|
|  2|   110|   aaa|walk|work|     43|         1|
|  3|   110|   aaa| bus|work|     43|         2|
|  4|   110|   aaa| bus|work|     43|         2|
|  5|   110|   aaa|walk|work|     43|         3|
|  6|   110|   bbb|walk|home|     62|         1|
|  7|   110|   bbb| bus|home|     62|         2|
|  8|   110|   bbb| bus|home|     62|         2|
|  9|   110|   bbb|walk|home|     62|         3|
+---+------+------+----+----+-------+----------+

作为对 cronoik 响应的优化,您可以利用数据集上已有全局唯一 'idx' 列这一事实,并将 'trip_id' 和 'subtrip_id' 设置为 idx行程或子带开始的行。

这将使您通过单调递增的整数一次性生成 trip_id 和 subtrip_id。 由于您需要 subtrip_id 从 1 开始并以 1 为增量增加,因此您将第二次 window 通过 :

import sys
import pyspark.sql.functions as F
from pyspark.sql import Window

df = spark.createDataFrame([
                            (1, 110, 'aaa', 'walk', 'work'),
                            (2, 110, 'aaa', 'walk', 'work'),
                            (3, 110, 'aaa', 'bus', 'work'),
                            (4, 110, 'aaa', 'bus', 'work'),
                            (5, 110, 'aaa', 'walk','work'),
                            (6, 110, 'bbb', 'walk', 'home'),
                            (7, 110, 'bbb', 'bus', 'home'),
                            (8, 110, 'bbb', 'bus',  'home'),
                            (9, 110, 'bbb', 'walk', 'home')
                        ],
                        ['idx', 'u_uuid', 'p_uuid', 'mode', 'dest']
                    )

tripW = Window.partitionBy('u_uuid', 'p_uuid', 'dest').orderBy('idx')

df_final  = df\
  .withColumn("trip_id", F.first('idx').over(tripW))\
  .withColumn('lagged_mode', F.lag('mode', default='').over(tripW))\
  .withColumn('subtrip_id', (F.col('mode') != F.col('lagged_mode')).cast('int'))\
  .withColumn('subtrip_id',F.sum('subtrip_id').over(tripW))\
  .drop('lagged_mode')\
  .sort('idx')

df_final.show()

+---+------+------+----+----+-------+----------+
|idx|u_uuid|p_uuid|mode|dest|trip_id|subtrip_id|
+---+------+------+----+----+-------+----------+
|  1|   110|   aaa|walk|work|      1|         1|
|  2|   110|   aaa|walk|work|      1|         1|
|  3|   110|   aaa| bus|work|      1|         2|
|  4|   110|   aaa| bus|work|      1|         2|
|  5|   110|   aaa|walk|work|      1|         3|
|  6|   110|   bbb|walk|home|      6|         1|
|  7|   110|   bbb| bus|home|      6|         2|
|  8|   110|   bbb| bus|home|      6|         2|
|  9|   110|   bbb|walk|home|      6|         3|
+---+------+------+----+----+-------+----------+

如果您查看生成的计划,Spark 可以非常高效地构建它:

df_final.explain()

TakeOrderedAndProject(limit=21, orderBy=[idx#2861L ASC NULLS FIRST], output=[idx#12637,u_uuid#12638,p_uuid#2863,mode#2864,dest#2865,trip_id#12642,subtrip_id#12643])
+- *(3) Project [idx#2861L, u_uuid#2862L, p_uuid#2863, mode#2864, dest#2865, trip_id#12589L, subtrip_id#12614L]
   +- Window [sum(cast(subtrip_id#12604 as bigint)) windowspecdefinition(u_uuid#2862L, p_uuid#2863, dest#2865, idx#2861L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS subtrip_id#12614L], [u_uuid#2862L, p_uuid#2863, dest#2865], [idx#2861L ASC NULLS FIRST]
      +- *(2) Project [idx#2861L, u_uuid#2862L, p_uuid#2863, mode#2864, dest#2865, trip_id#12589L, cast(NOT (mode#2864 = lagged_mode#12596) as int) AS subtrip_id#12604]
         +- Window [first(idx#2861L, false) windowspecdefinition(u_uuid#2862L, p_uuid#2863, dest#2865, idx#2861L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS trip_id#12589L, lag(mode#2864, 1, ) windowspecdefinition(u_uuid#2862L, p_uuid#2863, dest#2865, idx#2861L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS lagged_mode#12596], [u_uuid#2862L, p_uuid#2863, dest#2865], [idx#2861L ASC NULLS FIRST]
            +- *(1) Sort [u_uuid#2862L ASC NULLS FIRST, p_uuid#2863 ASC NULLS FIRST, dest#2865 ASC NULLS FIRST, idx#2861L ASC NULLS FIRST], false, 0
               +- Exchange hashpartitioning(u_uuid#2862L, p_uuid#2863, dest#2865, 200)
                  +- Scan ExistingRDD[idx#2861L,u_uuid#2862L,p_uuid#2863,mode#2864,dest#2865]

cronoik 的解决方案使用不同的排序或 window 标准,因此将采用更昂贵的排序步骤和更长的执行时间:

df_subtrip.sort('idx').explain()

TakeOrderedAndProject(limit=21, orderBy=[idx#2861L ASC NULLS FIRST], output=[idx#2948,u_uuid#2949,p_uuid#2863,mode#2864,dest#2865,trip_id#2953,subtrip_id#2954])
+- *(4) Project [idx#2861L, u_uuid#2862L, p_uuid#2863, mode#2864, dest#2865, trip_id#2887, subtrip_id#2933]
   +- Window [last(subtrip_id#2923, true) windowspecdefinition(u_uuid#2862L, p_uuid#2863, dest#2865, idx#2861L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS subtrip_id#2933], [u_uuid#2862L, p_uuid#2863, dest#2865], [idx#2861L ASC NULLS FIRST]
      +- *(3) Sort [u_uuid#2862L ASC NULLS FIRST, p_uuid#2863 ASC NULLS FIRST, dest#2865 ASC NULLS FIRST, idx#2861L ASC NULLS FIRST], false, 0
         +- *(3) Project [idx#2861L, u_uuid#2862L, p_uuid#2863, mode#2864, dest#2865, trip_id#2887, CASE WHEN (subtrip_id#2913 = 1) THEN _we0#2924 ELSE null END AS subtrip_id#2923]
            +- Window [row_number() windowspecdefinition(u_uuid#2862L, p_uuid#2863, dest#2865, subtrip_id#2913, idx#2861L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _we0#2924], [u_uuid#2862L, p_uuid#2863, dest#2865, subtrip_id#2913], [idx#2861L ASC NULLS FIRST]
               +- *(2) Sort [u_uuid#2862L ASC NULLS FIRST, p_uuid#2863 ASC NULLS FIRST, dest#2865 ASC NULLS FIRST, subtrip_id#2913 ASC NULLS FIRST, idx#2861L ASC NULLS FIRST], false, 0
                  +- *(2) Project [idx#2861L, u_uuid#2862L, p_uuid#2863, mode#2864, dest#2865, trip_id#2887, CASE WHEN NOT (mode#2864 = _we0#2914) THEN 1 ELSE null END AS subtrip_id#2913]
                     +- Window [lag(mode#2864, 1, SOMETHING) windowspecdefinition(u_uuid#2862L, p_uuid#2863, dest#2865, idx#2861L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS _we0#2914], [u_uuid#2862L, p_uuid#2863, dest#2865], [idx#2861L ASC NULLS FIRST]
                        +- *(1) Sort [u_uuid#2862L ASC NULLS FIRST, p_uuid#2863 ASC NULLS FIRST, dest#2865 ASC NULLS FIRST, idx#2861L ASC NULLS FIRST], false, 0
                           +- *(1) Project [idx#2861L, u_uuid#2862L, p_uuid#2863, mode#2864, dest#2865, SPARK_PARTITION_ID() AS trip_id#2887]
                              +- Exchange hashpartitioning(u_uuid#2862L, p_uuid#2863, dest#2865, 200)
                                 +- Scan ExistingRDD[idx#2861L,u_uuid#2862L,p_uuid#2863,mode#2864,dest#2865]