Spark SQL window 带条件的函数范围边界

Spark SQL window function range boundaries with condition

我的数据如下所示:

         Sequence|       type      | sg       |
+-----------------+----------------+----------+
|              1| Pump             |3         |
|              2| Pump             |2         |
|              3| Inject           |4         |
|              4| Pump             |5         |
|              5| Pump             |3         | 
|              6| pump             |6         |
|              7| Inject           |7         |
|              8| Inject           |8         |
|              9| Pump             |9         |
+-----------------+----------------+----------+

我想添加一个新列并检查以前的 type 值。

如果之前的type值为Pump,将新列的值设为对应的sg.

的值

如果是inject,则获取前面所有行的sg值的总和,直到找到具有Pump type的行(它的sg 值被包含在总和中)。

例如: 对于 Sequence = 2,前一行的 typePump,因此新列的值应该是相应 sg 列的值:3.

对于 Sequence = 9,前一行的 typeInject,因此新列的值将是前 3 行的 sg 列的总和,因为 Sequence = 6 行是带有 type = Pump 的第一行。新列的值将是 8 + 7 + 6 = 21.

最终输出应该是这样的:

       Sequence|       type      | sg       |  New sg |
+-----------------+----------------+----------+--------+
|              1| Pump             |3         |-
|              2| Pump             |2         |3
|              3| Inject           |4         |2
|              4| Pump             |5         |6
|              5| Pump             |3         |5
|              6| pump             |6         |3
|              7| Inject           |7         |6
|              8| Inject           |8         |7
|              9| Pump             |9         |21
+-----------------+----------------+----------+

根据您的规则,这只是一堆 window 函数。诀窍是按组聚合 "pump" 值和 "inject"s。 "pump"s 的累积总和找到组。

则查询为:

select t.*,
        (case when prev_type = 'Pump' then sg
              else lag(pump_sg) over (order by id)
         end) as your_value
from (select t.*,
             sum(sg) over (partition by pump_grp) as pump_sg
      from (select t.*,
                   lag(sg) over (order by id) as prev_sg,
                   lag(type) over (order by id) as prev_type,
                 sum(case when type = 'Pump' then 1 else 0 end) over (order by id) as pump_grp
            from t
           ) t
     ) t;

我认为您的规则过于复杂,并且您不需要前一行 "pump" 的特例。所以:

select t.*,
       lag(pump_sg) over (order by id) as your_value
from (select t.*,
             sum(sg) over (partition by pump_grp) as pump_sg
      from (select t.*,
                 sum(case when type = 'Pump' then 1 else 0 end) over (order by id) as pump_grp
            from t
           ) t
     ) t;