使用时间序列数据删除 Spark 数据帧中的冗余行

Removing redundant rows in a Spark data frame with time series data

我有一个如下所示的 Spark 数据框(为清楚起见,简化了时间戳和 id 列值):

| Timestamp | id |     status  |
--------------------------------
|         1 |  1 |     pending |
|         2 |  2 |     pending |
|         3 |  1 | in-progress |
|         4 |  1 | in-progress |
|         5 |  3 | in-progress |
|         6 |  1 |     pending |
|         7 |  4 |      closed |
|         8 |  1 |     pending |
|         9 |  1 | in-progress |

这是状态事件的时间序列。我最终想要的只是代表状态变化的行。从这个意义上说,问题可以看作是删除冗余行的问题之一 - 例如时间为 4 和 8 的条目 - 都为 id = 1 - 应该被删除,因为它们不代表给定 id 的状态变化。

对于上面的一组行,这将给出(顺序不重要):

| Timestamp | id |     status  |
--------------------------------
|         1 |  1 |     pending |
|         2 |  2 |     pending |
|         3 |  1 | in-progress |
|         5 |  3 | in-progress |
|         6 |  1 |     pending |
|         7 |  4 |      closed |
|         9 |  1 | in-progress |

最初的计划是按 ID 和状态进行分区,按时间戳排序,并为每个分区选择第一行 - 然而这会导致

| Timestamp | id |     status  |
--------------------------------
|         1 |  1 |     pending |
|         2 |  2 |     pending |
|         3 |  1 | in-progress |
|         5 |  3 | in-progress |
|         7 |  4 |      closed |

即它会丢失重复的状态更改。

感谢任何指点,我是数据框的新手,可能会遗漏一两个技巧。

使用 lag window 函数应该可以解决问题

case class Event(timestamp: Int, id: Int, status: String)

val events = sqlContext.createDataFrame(sc.parallelize(
    Event(1, 1, "pending") :: Event(2, 2, "pending") ::
    Event(3, 1, "in-progress") :: Event(4, 1, "in-progress") ::
    Event(5, 3, "in-progress") :: Event(6, 1, "pending") ::
    Event(7, 4, "closed") :: Event(8, 1, "pending") ::
    Event(9, 1, "in-progress") :: Nil
))

events.registerTempTable("events")

val query = """SELECT timestamp, id, status FROM (
    SELECT timestamp, id, status, lag(status) OVER (
        PARTITION BY id ORDER BY timestamp
    ) AS prev_status  FROM events) tmp
    WHERE prev_status IS NULL OR prev_status != status
    ORDER BY timestamp, id"""

sqlContext.sql(query).show

内部查询

SELECT timestamp, id, status, lag(status) OVER (
    PARTITION BY id ORDER BY timestamp
) AS prev_status  FROM events

如下所示创建 table,其中 prev_status 是给定 idstatus 的先前值,并按 timestamp.[=24= 排序]

+---------+--+-----------+-----------+
|timestamp|id|     status|prev_status|
+---------+--+-----------+-----------+
|        1| 1|    pending|       null|
|        3| 1|in-progress|    pending|
|        4| 1|in-progress|in-progress|
|        6| 1|    pending|in-progress|
|        8| 1|    pending|    pending|
|        9| 1|in-progress|    pending|
|        2| 2|    pending|       null|
|        5| 3|in-progress|       null|
|        7| 4|     closed|       null|
+---------+--+-----------+-----------+

外部查询

SELECT timestamp, id, status FROM (...)
WHERE prev_status IS NULL OR prev_status != status
ORDER BY timestamp, id

仅过滤 prev_statusNULL 的行(给定 id 的第一行)或 prev_statusstatus 不同的行(有一个连续时间戳之间的状态变化)。添加订单只是为了更容易进行目视检查。