Spark SQL window 带条件的函数范围边界
Spark SQL window function range boundaries with condition
我的数据如下所示:
Sequence| type | sg |
+-----------------+----------------+----------+
| 1| Pump |3 |
| 2| Pump |2 |
| 3| Inject |4 |
| 4| Pump |5 |
| 5| Pump |3 |
| 6| pump |6 |
| 7| Inject |7 |
| 8| Inject |8 |
| 9| Pump |9 |
+-----------------+----------------+----------+
我想添加一个新列并检查以前的 type
值。
如果之前的type
值为Pump
,将新列的值设为对应的sg
.
的值
如果是inject
,则获取前面所有行的sg
值的总和,直到找到具有Pump
type
的行(它的sg
值被包含在总和中)。
例如:
对于 Sequence = 2
,前一行的 type
是 Pump
,因此新列的值应该是相应 sg
列的值:3.
对于 Sequence = 9
,前一行的 type
是 Inject
,因此新列的值将是前 3 行的 sg
列的总和,因为 Sequence = 6
行是带有 type = Pump
的第一行。新列的值将是 8 + 7 + 6 = 21
.
最终输出应该是这样的:
Sequence| type | sg | New sg |
+-----------------+----------------+----------+--------+
| 1| Pump |3 |-
| 2| Pump |2 |3
| 3| Inject |4 |2
| 4| Pump |5 |6
| 5| Pump |3 |5
| 6| pump |6 |3
| 7| Inject |7 |6
| 8| Inject |8 |7
| 9| Pump |9 |21
+-----------------+----------------+----------+
根据您的规则,这只是一堆 window 函数。诀窍是按组聚合 "pump" 值和 "inject"s。 "pump"s 的累积总和找到组。
则查询为:
select t.*,
(case when prev_type = 'Pump' then sg
else lag(pump_sg) over (order by id)
end) as your_value
from (select t.*,
sum(sg) over (partition by pump_grp) as pump_sg
from (select t.*,
lag(sg) over (order by id) as prev_sg,
lag(type) over (order by id) as prev_type,
sum(case when type = 'Pump' then 1 else 0 end) over (order by id) as pump_grp
from t
) t
) t;
我认为您的规则过于复杂,并且您不需要前一行 "pump" 的特例。所以:
select t.*,
lag(pump_sg) over (order by id) as your_value
from (select t.*,
sum(sg) over (partition by pump_grp) as pump_sg
from (select t.*,
sum(case when type = 'Pump' then 1 else 0 end) over (order by id) as pump_grp
from t
) t
) t;
我的数据如下所示:
Sequence| type | sg |
+-----------------+----------------+----------+
| 1| Pump |3 |
| 2| Pump |2 |
| 3| Inject |4 |
| 4| Pump |5 |
| 5| Pump |3 |
| 6| pump |6 |
| 7| Inject |7 |
| 8| Inject |8 |
| 9| Pump |9 |
+-----------------+----------------+----------+
我想添加一个新列并检查以前的 type
值。
如果之前的type
值为Pump
,将新列的值设为对应的sg
.
如果是inject
,则获取前面所有行的sg
值的总和,直到找到具有Pump
type
的行(它的sg
值被包含在总和中)。
例如:
对于 Sequence = 2
,前一行的 type
是 Pump
,因此新列的值应该是相应 sg
列的值:3.
对于 Sequence = 9
,前一行的 type
是 Inject
,因此新列的值将是前 3 行的 sg
列的总和,因为 Sequence = 6
行是带有 type = Pump
的第一行。新列的值将是 8 + 7 + 6 = 21
.
最终输出应该是这样的:
Sequence| type | sg | New sg |
+-----------------+----------------+----------+--------+
| 1| Pump |3 |-
| 2| Pump |2 |3
| 3| Inject |4 |2
| 4| Pump |5 |6
| 5| Pump |3 |5
| 6| pump |6 |3
| 7| Inject |7 |6
| 8| Inject |8 |7
| 9| Pump |9 |21
+-----------------+----------------+----------+
根据您的规则,这只是一堆 window 函数。诀窍是按组聚合 "pump" 值和 "inject"s。 "pump"s 的累积总和找到组。
则查询为:
select t.*,
(case when prev_type = 'Pump' then sg
else lag(pump_sg) over (order by id)
end) as your_value
from (select t.*,
sum(sg) over (partition by pump_grp) as pump_sg
from (select t.*,
lag(sg) over (order by id) as prev_sg,
lag(type) over (order by id) as prev_type,
sum(case when type = 'Pump' then 1 else 0 end) over (order by id) as pump_grp
from t
) t
) t;
我认为您的规则过于复杂,并且您不需要前一行 "pump" 的特例。所以:
select t.*,
lag(pump_sg) over (order by id) as your_value
from (select t.*,
sum(sg) over (partition by pump_grp) as pump_sg
from (select t.*,
sum(case when type = 'Pump' then 1 else 0 end) over (order by id) as pump_grp
from t
) t
) t;