PySpark 条件增量
PySpark conditional increment
我是 PySpark 的新手,我正在尝试转换一些 python 派生新变量 'COUNT_IDX' 的代码。新变量的初始值为 1,但在满足条件时递增 1。否则,新变量值将与上一条记录中的值相同。
增加的条件是:
TRIP_CD不等于上一条记录TRIP_CD
或SIGN不等于上一条记录SIGN
或 time_diff不等于1.
Python 代码(pandas 数据帧):
df['COUNT_IDX'] = 1
for i in range(1, len(df)):
if ((df['TRIP_CD'].iloc[i] != df['TRIP_CD'].iloc[i - 1])
or (df['SIGN'].iloc[i] != df['SIGN'].iloc[i-1])
or df['time_diff'].iloc[i] != 1):
df['COUNT_IDX'].iloc[i] = df['COUNT_IDX'].iloc[i-1] + 1
else:
df['COUNT_IDX'].iloc[i] = df['COUNT_IDX'].iloc[i-1]
这是预期的结果:
TRIP_CD SIGN time_diff COUNT_IDX
2711 - 1 1
2711 - 1 1
2711 + 2 2
2711 - 1 3
2711 - 1 3
2854 - 1 4
2854 + 1 5
在 PySpark 中,我将 COUNT_IDX 初始化为 1。然后使用 Window 函数,我计算了 TRIP_CD 和 SIGN 的滞后并计算了 time_diff,然后尝试过:
df = sqlContext.sql('''
select TRIP, TRIP_CD, SIGN, TIME_STAMP, seconds_diff,
case when TRIP_CD != TRIP_lag or SIGN != SIGN_lag or seconds_diff != 1
then (lag(COUNT_INDEX) over(partition by TRIP order by TRIP, TIME_STAMP))+1
else (lag(COUNT_INDEX) over(partition by TRIP order by TRIP, TIME_STAMP))
end as COUNT_INDEX from df''')
这给了我这样的东西:
TRIP_CD SIGN time_diff COUNT_IDX
2711 - 1 1
2711 - 1 1
2711 + 2 2
2711 - 1 2
2711 - 1 1
2854 - 1 2
2854 + 1 2
如果 COUNT_IDX 在以前的记录上更新,则当前记录上的 COUNT_IDX 无法识别要计算的更改。这就像 COUNTI_IDX 没有被覆盖或者没有逐行评估。关于如何解决这个问题的任何想法?
此处需要累计金额:
-- cumulative sum
SUM(CAST(
-- if at least one condition has been satisfied
-- we take 1 otherwise 0
TRIP_CD != TRIP_lag OR SIGN != SIGN_lag OR seconds_diff != 1 AS LONG
)) OVER W
...
WINDOW W AS (PARTITION BY trip ORDER BY times_stamp)
我是 PySpark 的新手,我正在尝试转换一些 python 派生新变量 'COUNT_IDX' 的代码。新变量的初始值为 1,但在满足条件时递增 1。否则,新变量值将与上一条记录中的值相同。
增加的条件是: TRIP_CD不等于上一条记录TRIP_CD 或SIGN不等于上一条记录SIGN 或 time_diff不等于1.
Python 代码(pandas 数据帧):
df['COUNT_IDX'] = 1
for i in range(1, len(df)):
if ((df['TRIP_CD'].iloc[i] != df['TRIP_CD'].iloc[i - 1])
or (df['SIGN'].iloc[i] != df['SIGN'].iloc[i-1])
or df['time_diff'].iloc[i] != 1):
df['COUNT_IDX'].iloc[i] = df['COUNT_IDX'].iloc[i-1] + 1
else:
df['COUNT_IDX'].iloc[i] = df['COUNT_IDX'].iloc[i-1]
这是预期的结果:
TRIP_CD SIGN time_diff COUNT_IDX
2711 - 1 1
2711 - 1 1
2711 + 2 2
2711 - 1 3
2711 - 1 3
2854 - 1 4
2854 + 1 5
在 PySpark 中,我将 COUNT_IDX 初始化为 1。然后使用 Window 函数,我计算了 TRIP_CD 和 SIGN 的滞后并计算了 time_diff,然后尝试过:
df = sqlContext.sql('''
select TRIP, TRIP_CD, SIGN, TIME_STAMP, seconds_diff,
case when TRIP_CD != TRIP_lag or SIGN != SIGN_lag or seconds_diff != 1
then (lag(COUNT_INDEX) over(partition by TRIP order by TRIP, TIME_STAMP))+1
else (lag(COUNT_INDEX) over(partition by TRIP order by TRIP, TIME_STAMP))
end as COUNT_INDEX from df''')
这给了我这样的东西:
TRIP_CD SIGN time_diff COUNT_IDX
2711 - 1 1
2711 - 1 1
2711 + 2 2
2711 - 1 2
2711 - 1 1
2854 - 1 2
2854 + 1 2
如果 COUNT_IDX 在以前的记录上更新,则当前记录上的 COUNT_IDX 无法识别要计算的更改。这就像 COUNTI_IDX 没有被覆盖或者没有逐行评估。关于如何解决这个问题的任何想法?
此处需要累计金额:
-- cumulative sum
SUM(CAST(
-- if at least one condition has been satisfied
-- we take 1 otherwise 0
TRIP_CD != TRIP_lag OR SIGN != SIGN_lag OR seconds_diff != 1 AS LONG
)) OVER W
...
WINDOW W AS (PARTITION BY trip ORDER BY times_stamp)