计算当前行中的值与每组第一行中的值之间的差异 - pyspark

Calculate difference between value in current row and value in first row per group - pyspark

我有这个数据框:

DataFrame[date: string, t: string, week: string, a: bigint, b: bigint]

具有以下数据:

+---------+--+--------+---+---+
|date     |t |week    |a  |b  |
+---------+--+--------+---+---+
|20180328 |1 |2018-W10|31 |35 |
|20180328 |1 |2018-W11|18 |37 |
|20180328 |1 |2018-W12|19 |37 |
|20180328 |1 |2018-W13|19 |38 |
|20180328 |1 |2018-W14|20 |38 |
|20180328 |1 |2018-W15|22 |39 |
|20180328 |1 |2018-W16|23 |39 |
|20180328 |1 |2018-W17|24 |40 |
|20180328 |1 |2018-W18|25 |40 |
|20180328 |1 |2018-W19|25 |41 |
|20180328 |1 |2018-W20|26 |41 |
|20180328 |1 |2018-W21|26 |41 |
|20180328 |1 |2018-W22|26 |41 |
|20180328 |2 |2018-W10|14 |26 |
|20180328 |2 |2018-W11|82 |33 |
|20180328 |2 |2018-W12|87 |36 |
|20180328 |2 |2018-W13|89 |39 |
|20180328 |2 |2018-W14|10 |45 |
|20180328 |2 |2018-W15|10 |45 |
|20180328 |2 |2018-W16|11 |48 |
|20180328 |2 |2018-W17|11 |55 |
|20180328 |2 |2018-W18|11 |60 |
|20180328 |2 |2018-W19|11 |70 |
|20180328 |2 |2018-W20|11 |79 |
|20180328 |2 |2018-W21|11 |86 |
|20180328 |2 |2018-W22|12 |93 |
+---------+--+--------+---+---+

我想添加一个新列,对于每个日期和类型(列 t),该行与列 b 的该日期的第一周之间的差异。

像这样:

+---------+--+--------+---+---+---+
|date     |t |week    |a  |b  |h  |
+---------+--+--------+---+---+---+
|20180328 |1 |2018-W10|31 |35 |0  | 
|20180328 |1 |2018-W11|18 |37 |2  |
|20180328 |1 |2018-W12|19 |37 |2  |
|20180328 |1 |2018-W13|19 |38 |3  |
|20180328 |1 |2018-W14|20 |38 |3  |
|20180328 |1 |2018-W15|22 |39 |4  |
|20180328 |1 |2018-W16|23 |39 |4  |
|20180328 |1 |2018-W17|24 |40 |5  |
|20180328 |1 |2018-W18|25 |40 |5  |
|20180328 |1 |2018-W19|25 |41 |6  |
|20180328 |1 |2018-W20|26 |41 |6  |
|20180328 |1 |2018-W21|26 |41 |6  | 
|20180328 |1 |2018-W22|26 |41 |6  | 
|20180328 |2 |2018-W10|14 |26 |0  | 
|20180328 |2 |2018-W11|82 |33 |7  | 
|20180328 |2 |2018-W12|87 |36 |10 | 
|20180328 |2 |2018-W13|89 |39 |13 | 
|20180328 |2 |2018-W14|10 |45 |19 | 
|20180328 |2 |2018-W15|10 |45 |19 | 
|20180328 |2 |2018-W16|11 |48 |22 | 
|20180328 |2 |2018-W17|11 |55 |29 | 
|20180328 |2 |2018-W18|11 |60 |34 | 
|20180328 |2 |2018-W19|11 |70 |44 | 
|20180328 |2 |2018-W20|11 |79 |53 | 
|20180328 |2 |2018-W21|11 |86 |60 | 
|20180328 |2 |2018-W22|12 |93 |67 | 
+---------+--+--------+---+---+---+

h 列中的每个数字都是 col('b') 中的值 - col('b') 中该类型在 W10 的值。

您可以使用 pyspark.sql.Window.

按列 't' 分区并按列 'week' 排序。这是有效的,因为对您的周列进行排序将按字典顺序排序,并且 'W10' 将是您的组的第一个值。如果不是这种情况,您将需要找到另一种方法来对列进行排序,以便顺序是您想要的。

这是一个简化的例子。

data = [
    ('20180328',1,'2018-W10',31,35),
    ('20180328',1,'2018-W11',18,37),
    ('20180328',1,'2018-W12',19,37),
    ('20180328',1,'2018-W13',19,38),
    ('20180328',1,'2018-W14',20,38),
    ('20180328',2,'2018-W10',14,26),
    ('20180328',2,'2018-W11',82,33),
    ('20180328',2,'2018-W12',87,36),
    ('20180328',2,'2018-W13',89,39)
]

df = sqlCtx.createDataFrame(data, ['date', 't', 'week', 'a', 'b'])
df.show()
#+--------+---+--------+---+---+
#|    date|  t|    week|  a|  b|
#+--------+---+--------+---+---+
#|20180328|  1|2018-W10| 31| 35|
#|20180328|  1|2018-W11| 18| 37|
#|20180328|  1|2018-W12| 19| 37|
#|20180328|  1|2018-W13| 19| 38|
#|20180328|  1|2018-W14| 20| 38|
#|20180328|  2|2018-W10| 14| 26|
#|20180328|  2|2018-W11| 82| 33|
#|20180328|  2|2018-W12| 87| 36|
#|20180328|  2|2018-W13| 89| 39|
#+--------+---+--------+---+---+

使用 pyspark DataFrame 函数

定义 Window:

from pyspark.sql import Window   
w = Window.partitionBy('t').orderBy('week')

使用 Window 创建新列:

import pyspark.sql.functions as f

df = df.select('*', (f.col('b') - f.first('b').over(w)).alias('h'))
df.show()
#+--------+---+--------+---+---+---+
#|    date|  t|    week|  a|  b|  h|
#+--------+---+--------+---+---+---+
#|20180328|  1|2018-W10| 31| 35|  0|
#|20180328|  1|2018-W11| 18| 37|  2|
#|20180328|  1|2018-W12| 19| 37|  2|
#|20180328|  1|2018-W13| 19| 38|  3|
#|20180328|  1|2018-W14| 20| 38|  3|
#|20180328|  2|2018-W10| 14| 26|  0|
#|20180328|  2|2018-W11| 82| 33|  7|
#|20180328|  2|2018-W12| 87| 36| 10|
#|20180328|  2|2018-W13| 89| 39| 13|
#+--------+---+--------+---+---+---+

使用pyspark-sql

这是使用 pyspark-sql 的等效操作:

df.registerTempTable('myTable')
df = sqlCtx.sql(
    "SELECT *, (b - FIRST(b) OVER (PARTITION BY t ORDER BY week)) AS h FROM myTable"
)
df.show()
#+--------+---+--------+---+---+---+
#|    date|  t|    week|  a|  b|  h|
#+--------+---+--------+---+---+---+
#|20180328|  1|2018-W10| 31| 35|  0|
#|20180328|  1|2018-W11| 18| 37|  2|
#|20180328|  1|2018-W12| 19| 37|  2|
#|20180328|  1|2018-W13| 19| 38|  3|
#|20180328|  1|2018-W14| 20| 38|  3|
#|20180328|  2|2018-W10| 14| 26|  0|
#|20180328|  2|2018-W11| 82| 33|  7|
#|20180328|  2|2018-W12| 87| 36| 10|
#|20180328|  2|2018-W13| 89| 39| 13|
#+--------+---+--------+---+---+---+

相关