使用时间序列数据删除 Spark 数据帧中的冗余行
Removing redundant rows in a Spark data frame with time series data
我有一个如下所示的 Spark 数据框(为清楚起见,简化了时间戳和 id 列值):
| Timestamp | id | status |
--------------------------------
| 1 | 1 | pending |
| 2 | 2 | pending |
| 3 | 1 | in-progress |
| 4 | 1 | in-progress |
| 5 | 3 | in-progress |
| 6 | 1 | pending |
| 7 | 4 | closed |
| 8 | 1 | pending |
| 9 | 1 | in-progress |
这是状态事件的时间序列。我最终想要的只是代表状态变化的行。从这个意义上说,问题可以看作是删除冗余行的问题之一 - 例如时间为 4 和 8 的条目 - 都为 id = 1 - 应该被删除,因为它们不代表给定 id 的状态变化。
对于上面的一组行,这将给出(顺序不重要):
| Timestamp | id | status |
--------------------------------
| 1 | 1 | pending |
| 2 | 2 | pending |
| 3 | 1 | in-progress |
| 5 | 3 | in-progress |
| 6 | 1 | pending |
| 7 | 4 | closed |
| 9 | 1 | in-progress |
最初的计划是按 ID 和状态进行分区,按时间戳排序,并为每个分区选择第一行 - 然而这会导致
| Timestamp | id | status |
--------------------------------
| 1 | 1 | pending |
| 2 | 2 | pending |
| 3 | 1 | in-progress |
| 5 | 3 | in-progress |
| 7 | 4 | closed |
即它会丢失重复的状态更改。
感谢任何指点,我是数据框的新手,可能会遗漏一两个技巧。
使用 lag
window 函数应该可以解决问题
case class Event(timestamp: Int, id: Int, status: String)
val events = sqlContext.createDataFrame(sc.parallelize(
Event(1, 1, "pending") :: Event(2, 2, "pending") ::
Event(3, 1, "in-progress") :: Event(4, 1, "in-progress") ::
Event(5, 3, "in-progress") :: Event(6, 1, "pending") ::
Event(7, 4, "closed") :: Event(8, 1, "pending") ::
Event(9, 1, "in-progress") :: Nil
))
events.registerTempTable("events")
val query = """SELECT timestamp, id, status FROM (
SELECT timestamp, id, status, lag(status) OVER (
PARTITION BY id ORDER BY timestamp
) AS prev_status FROM events) tmp
WHERE prev_status IS NULL OR prev_status != status
ORDER BY timestamp, id"""
sqlContext.sql(query).show
内部查询
SELECT timestamp, id, status, lag(status) OVER (
PARTITION BY id ORDER BY timestamp
) AS prev_status FROM events
如下所示创建 table,其中 prev_status
是给定 id
的 status
的先前值,并按 timestamp
.[=24= 排序]
+---------+--+-----------+-----------+
|timestamp|id| status|prev_status|
+---------+--+-----------+-----------+
| 1| 1| pending| null|
| 3| 1|in-progress| pending|
| 4| 1|in-progress|in-progress|
| 6| 1| pending|in-progress|
| 8| 1| pending| pending|
| 9| 1|in-progress| pending|
| 2| 2| pending| null|
| 5| 3|in-progress| null|
| 7| 4| closed| null|
+---------+--+-----------+-----------+
外部查询
SELECT timestamp, id, status FROM (...)
WHERE prev_status IS NULL OR prev_status != status
ORDER BY timestamp, id
仅过滤 prev_status
为 NULL
的行(给定 id
的第一行)或 prev_status
与 status
不同的行(有一个连续时间戳之间的状态变化)。添加订单只是为了更容易进行目视检查。
我有一个如下所示的 Spark 数据框(为清楚起见,简化了时间戳和 id 列值):
| Timestamp | id | status |
--------------------------------
| 1 | 1 | pending |
| 2 | 2 | pending |
| 3 | 1 | in-progress |
| 4 | 1 | in-progress |
| 5 | 3 | in-progress |
| 6 | 1 | pending |
| 7 | 4 | closed |
| 8 | 1 | pending |
| 9 | 1 | in-progress |
这是状态事件的时间序列。我最终想要的只是代表状态变化的行。从这个意义上说,问题可以看作是删除冗余行的问题之一 - 例如时间为 4 和 8 的条目 - 都为 id = 1 - 应该被删除,因为它们不代表给定 id 的状态变化。
对于上面的一组行,这将给出(顺序不重要):
| Timestamp | id | status |
--------------------------------
| 1 | 1 | pending |
| 2 | 2 | pending |
| 3 | 1 | in-progress |
| 5 | 3 | in-progress |
| 6 | 1 | pending |
| 7 | 4 | closed |
| 9 | 1 | in-progress |
最初的计划是按 ID 和状态进行分区,按时间戳排序,并为每个分区选择第一行 - 然而这会导致
| Timestamp | id | status |
--------------------------------
| 1 | 1 | pending |
| 2 | 2 | pending |
| 3 | 1 | in-progress |
| 5 | 3 | in-progress |
| 7 | 4 | closed |
即它会丢失重复的状态更改。
感谢任何指点,我是数据框的新手,可能会遗漏一两个技巧。
使用 lag
window 函数应该可以解决问题
case class Event(timestamp: Int, id: Int, status: String)
val events = sqlContext.createDataFrame(sc.parallelize(
Event(1, 1, "pending") :: Event(2, 2, "pending") ::
Event(3, 1, "in-progress") :: Event(4, 1, "in-progress") ::
Event(5, 3, "in-progress") :: Event(6, 1, "pending") ::
Event(7, 4, "closed") :: Event(8, 1, "pending") ::
Event(9, 1, "in-progress") :: Nil
))
events.registerTempTable("events")
val query = """SELECT timestamp, id, status FROM (
SELECT timestamp, id, status, lag(status) OVER (
PARTITION BY id ORDER BY timestamp
) AS prev_status FROM events) tmp
WHERE prev_status IS NULL OR prev_status != status
ORDER BY timestamp, id"""
sqlContext.sql(query).show
内部查询
SELECT timestamp, id, status, lag(status) OVER (
PARTITION BY id ORDER BY timestamp
) AS prev_status FROM events
如下所示创建 table,其中 prev_status
是给定 id
的 status
的先前值,并按 timestamp
.[=24= 排序]
+---------+--+-----------+-----------+
|timestamp|id| status|prev_status|
+---------+--+-----------+-----------+
| 1| 1| pending| null|
| 3| 1|in-progress| pending|
| 4| 1|in-progress|in-progress|
| 6| 1| pending|in-progress|
| 8| 1| pending| pending|
| 9| 1|in-progress| pending|
| 2| 2| pending| null|
| 5| 3|in-progress| null|
| 7| 4| closed| null|
+---------+--+-----------+-----------+
外部查询
SELECT timestamp, id, status FROM (...)
WHERE prev_status IS NULL OR prev_status != status
ORDER BY timestamp, id
仅过滤 prev_status
为 NULL
的行(给定 id
的第一行)或 prev_status
与 status
不同的行(有一个连续时间戳之间的状态变化)。添加订单只是为了更容易进行目视检查。