PySpark - 添加一个递增的数字列,该列根据另一个列值的变化重置为 1
PySpark - Add an incrementing number column which resets to 1 based on another column value changing
首先我应该说我对 Python 和 PySpark 都很陌生,我的大部分经验都在 MS SQL、C#、VB.NET 等方面。 ....
我有一个要添加 'group_number' 字段的数据框。我需要这个数字根据日期时间字段递增,并根据值字段重置。所以我希望输出如:
+-----+----------------+-------------+
|value|datetime |group_number |
+-----+----------------+-------------+
|00001|2020-01-01 00:00|1 |
|00001|2020-01-01 02:10|2 |
|00001|2020-01-01 05:14|3 |
|00002|2020-01-01 00:03|1 |
|00002|2020-01-01 02:04|2 |
|00003|2020-01-01 03:03|1 |
+-----+----------------+-------------+
日期时间值有点无关紧要,因为它们可以在不同的点开始和结束,并在每个组内增加不同的数量,我只需要一个数字(1 到 x)来排序每个 'value'按时间顺序排列。
我已经编写了一个 udf 来尝试执行此操作,但我认为它对它们的排序不正确,我最终得到的大部分是“1”值和偶尔的“2”。
udf 定义为:
def createGroupID(value):
global iterationCount
global currentValue
if value == currentValue:
iterationCount = iterationCount + 1
return iterationCount
iterationCount = 1
currentValue = value
return iterationCount
两个全局变量在主应用程序中初始化,udf 被调用为:
createCountNumber = udf(createGroupID, StringType())
newdf = df.withColumn("group_number", createCountNumber('value'))
如果有人能帮我解决这个问题,我将不胜感激!非常感谢。
将数据读取为 pandas 数据框。然后您可以按值分组并进行累计计数:
import pandas as pd
df = pd.read_excel(r'file_path')
df['seq'] = df.groupby(['value', 'datetime']).cumcount()+1
df
感谢 Anil 为我指明了正确的方向....我在
找到了完整的解决方案
我需要添加以下内容:
w = Window.partitionBy("value")
df = df.withColumn("count", count("*").over(w))\
.withColumn("group_number", row_number().over(w.orderBy("datetime")))
现在我有了我需要的东西!
哦,我还需要添加一行让我使用上面代码块中的所有功能:
from pyspark.sql.functions import col, size, lit, udf, concat, row_number, count, when
首先我应该说我对 Python 和 PySpark 都很陌生,我的大部分经验都在 MS SQL、C#、VB.NET 等方面。 ....
我有一个要添加 'group_number' 字段的数据框。我需要这个数字根据日期时间字段递增,并根据值字段重置。所以我希望输出如:
+-----+----------------+-------------+
|value|datetime |group_number |
+-----+----------------+-------------+
|00001|2020-01-01 00:00|1 |
|00001|2020-01-01 02:10|2 |
|00001|2020-01-01 05:14|3 |
|00002|2020-01-01 00:03|1 |
|00002|2020-01-01 02:04|2 |
|00003|2020-01-01 03:03|1 |
+-----+----------------+-------------+
日期时间值有点无关紧要,因为它们可以在不同的点开始和结束,并在每个组内增加不同的数量,我只需要一个数字(1 到 x)来排序每个 'value'按时间顺序排列。
我已经编写了一个 udf 来尝试执行此操作,但我认为它对它们的排序不正确,我最终得到的大部分是“1”值和偶尔的“2”。
udf 定义为:
def createGroupID(value):
global iterationCount
global currentValue
if value == currentValue:
iterationCount = iterationCount + 1
return iterationCount
iterationCount = 1
currentValue = value
return iterationCount
两个全局变量在主应用程序中初始化,udf 被调用为:
createCountNumber = udf(createGroupID, StringType())
newdf = df.withColumn("group_number", createCountNumber('value'))
如果有人能帮我解决这个问题,我将不胜感激!非常感谢。
将数据读取为 pandas 数据框。然后您可以按值分组并进行累计计数:
import pandas as pd
df = pd.read_excel(r'file_path')
df['seq'] = df.groupby(['value', 'datetime']).cumcount()+1
df
感谢 Anil 为我指明了正确的方向....我在
我需要添加以下内容:
w = Window.partitionBy("value")
df = df.withColumn("count", count("*").over(w))\
.withColumn("group_number", row_number().over(w.orderBy("datetime")))
现在我有了我需要的东西!
哦,我还需要添加一行让我使用上面代码块中的所有功能:
from pyspark.sql.functions import col, size, lit, udf, concat, row_number, count, when