计算 PySpark 中列值的两个连续日期之间的唯一 ID

Count unique ids between two consecutive dates that are values of a column in PySpark

我有一个 PySpark DF,带有 ID 和日期列,看起来像这样。

ID Date
1 2021-10-01
2 2021-10-01
1 2021-10-02
3 2021-10-02

我想统计在一天前的日期不存在的唯一 ID 的数量。因此,这里的结果将是 1,因为在 2021-10-02 中只有一个新的唯一 ID。

ID Date Count
1 2021-10-01 -
2 2021-10-01 -
1 2021-10-02 1
3 2021-10-02 1

我尝试遵循 解决方案,但它不适用于日期类型值。任何帮助将不胜感激。 谢谢!

取出当天和前一天的id列表,然后求两者相差的大小,得到最后的结果。

更新解决方案以消除 join

df = df.select('date', F.expr('collect_set(id) over (partition by date) as id_arr')).dropDuplicates() \
    .select('*', F.expr('size(array_except(id_arr, lag(id_arr,1,id_arr) over (order by date))) as count')) \
    .select(F.explode('id_arr').alias('id'), 'date', 'count')
df.show(truncate=False)

如果您想避免自连接(例如出于性能原因),您可以使用 Window 函数:

from pyspark.sql import Row, Window
import datetime

df = spark.createDataFrame([
    Row(ID=1, date=datetime.date(2021,10,1)),
    Row(ID=2, date=datetime.date(2021,10,1)),
    Row(ID=1, date=datetime.date(2021,10,2)),
    Row(ID=2, date=datetime.date(2021,10,2)),
    Row(ID=1, date=datetime.date(2021,10,3)),
    Row(ID=3, date=datetime.date(2021,10,3)),
])

首先添加自上次看到 ID 以来的天数(如果以前从未出现过,则为 None

df = df.withColumn('days_since_last_occurrence', F.datediff('date', F.lag('date').over(Window.partitionBy('ID').orderBy('date'))))

其次,我们添加一个列来标记天数不为 1 的行。我们将 1 添加到该列中,以便我们稍后可以对该列求和以计算行数

df = df.withColumn('is_new', F.when(F.col('days_since_last_occurrence') == 1, None).otherwise(1))

现在我们对具有相同日期的所有行求和,然后删除我们不再需要的列:

(
    df
    .withColumn('count', F.sum('is_new').over(Window.partitionBy('date'))) # sum over all rows with the same date
    .drop('is_new', 'days_since_last_occurrence')
    .sort('date', 'ID')
    .show()
)
# Output:
+---+----------+-----+
| ID|      date|count|
+---+----------+-----+
|  1|2021-10-01|    2|
|  2|2021-10-01|    2|
|  1|2021-10-02| null|
|  2|2021-10-02| null|
|  1|2021-10-03|    1|
|  3|2021-10-03|    1|
+---+----------+-----+