将最大值时间戳放入 PySpark 的数组中

Put largest value timestamp in array in PySpark

我有一个包含以下列的 PySpark 数据框(比如 df1

1.> category - 包含独特的类别类型

2.> start_time_array - 按升序排列的时间戳数组

3.> end_time_array - 按升序排列的时间戳数组

4.> lenStart - start_time_array

中的数组长度

5.> lenEnd - end_time_array

中的数组长度

以下是 df1 的示例:

+--------+------------------------------------------+------------------------------------------+--------+------+
|category|                            end_time_array|                          start_time_array|lenStart|lenEnd|
+--------+------------------------------------------+------------------------------------------+--------+------+
|       A|[2017-01-18 00:00:00, 2017-01-27 00:00:00]|[2017-01-16 00:00:00, 2017-01-25 00:00:00]|       2|     2|
|       B|                     [2017-02-18 00:00:00]|[2017-02-14 00:00:00, 2017-02-21 00:00:00]|       2|     1|
+--------+------------------------------------------+------------------------------------------+--------+------+

还有另一个数据框 df2,它包含两列 categorytimestampdf2 包含与 df1 相同的 category 值,df1 中数组内的时间戳值是 df2 中时间戳的子集。以下是 df2

的示例
+--------+-------------------+
|category|          timestamp|
+--------+-------------------+
|       A|2017-01-16 00:00:00|
|       A|2017-01-18 00:00:00|
|       A|2017-01-25 00:00:00|
|       A|2017-01-27 00:00:00|
|       B|2017-02-14 00:00:00|
|       B|2017-02-18 00:00:00|
|       B|2017-02-21 00:00:00|
|       B|2017-02-22 00:00:00|
|       B|2017-02-24 00:00:00|
|       B|2017-02-25 00:00:00|
+--------+-------------------+

正如我们在上面 df1 的例子中看到的,对于 category -> BlenStart=2 不等于 lenEnd=1。在 df1 的所有行中,lenStart = lenEndlenStart = lenEnd+1 对于 df1lenStart = lenEnd+1 的所有行,我想取timestamp(适当的category)的最大值,并将其放在end_time_array中的数组中。我该怎么做?

以下是使用 df2

中的信息处理 df1 后的预期输出
+--------+------------------------------------------+------------------------------------------+--------+------+
|category|                            end_time_array|                          start_time_array|lenStart|lenEnd|
+--------+------------------------------------------+------------------------------------------+--------+------+
|       A|[2017-01-18 00:00:00, 2017-01-27 00:00:00]|[2017-01-16 00:00:00, 2017-01-25 00:00:00]|       2|     2|
|       B|[2017-02-18 00:00:00, 2017-02-25 00:00:00]|[2017-02-14 00:00:00, 2017-02-21 00:00:00]|       2|     2|
+--------+------------------------------------------+------------------------------------------+--------+------+

这应该适用于 Spark 1.5+:

import pyspark.sql.functions as F
df3 = df1.where(F.col('lenStart') == (F.col('lenEnd') + 1)).select('category')
df4 = df2.join(df3, 'Category').groupby('Category').agg(F.max('timestamp').alias('max'))
df5 = df1.join(df4, 'Category', 'left')
df1_changed = df5.withColumn('end_time_array', F.when(F.col('max').isNull(),
    F.col('end_time_array')).otherwise(F.concat(F.col('end_time_array'),
                                                F.array(F.col('max')))))
df1_changed = df1_changed.withColumn('lenEnd', F.size(F.col('end_time_array')))

df1_changed 将有一个修改过的 end_time_array 列,当您请求的条件适用时,它会添加所需的值,否则,它保持不变。