PySpark - 添加一个递增的数字列,该列根据另一个列值的变化重置为 1

PySpark - Add an incrementing number column which resets to 1 based on another column value changing

首先我应该说我对 Python 和 PySpark 都很陌生,我的大部分经验都在 MS SQL、C#、VB.NET 等方面。 ....

我有一个要添加 'group_number' 字段的数据框。我需要这个数字根据日期时间字段递增,并根据值字段重置。所以我希望输出如:

+-----+----------------+-------------+
|value|datetime        |group_number |
+-----+----------------+-------------+
|00001|2020-01-01 00:00|1            |
|00001|2020-01-01 02:10|2            |
|00001|2020-01-01 05:14|3            |
|00002|2020-01-01 00:03|1            |
|00002|2020-01-01 02:04|2            |
|00003|2020-01-01 03:03|1            |
+-----+----------------+-------------+

日期时间值有点无关紧要,因为它们可以在不同的点开始和结束,并在每个组内增加不同的数量,我只需要一个数字(1 到 x)来排序每个 'value'按时间顺序排列。

我已经编写了一个 udf 来尝试执行此操作,但我认为它对它们的排序不正确,我最终得到的大部分是“1”值和偶尔的“2”。

udf 定义为:

def createGroupID(value):
    global iterationCount
    global currentValue

    if value == currentValue:
        iterationCount = iterationCount + 1
        return iterationCount

    iterationCount = 1
    currentValue = value
    return iterationCount

两个全局变量在主应用程序中初始化,udf 被调用为:

    createCountNumber = udf(createGroupID, StringType())
    newdf = df.withColumn("group_number", createCountNumber('value'))

如果有人能帮我解决这个问题,我将不胜感激!非常感谢。

将数据读取为 pandas 数据框。然后您可以按值分组并进行累计计数:

import pandas as pd
df = pd.read_excel(r'file_path')
df['seq'] = df.groupby(['value', 'datetime']).cumcount()+1
df

感谢 Anil 为我指明了正确的方向....我在

找到了完整的解决方案

我需要添加以下内容:

    w = Window.partitionBy("value")
    df = df.withColumn("count", count("*").over(w))\
        .withColumn("group_number", row_number().over(w.orderBy("datetime")))

现在我有了我需要的东西!

哦,我还需要添加一行让我使用上面代码块中的所有功能:

from pyspark.sql.functions import col, size, lit, udf, concat, row_number, count, when