如何识别具有重叠周期的时间序列数据中的周期?

How do I identify cycles in time series data with overlapping periods?

给定具有 4 个任务类别(A、B、C、D)及其相应时间戳的时间序列数据,我的任务是识别 intervals/cycles 个 [(A,B,C,D)_1 , (A,B,C,D)_2, ...]

这会很简单(例如,哈希映射或链表),事件干净、不重叠,但我的数据包含序列(按时间排序),例如 [A、B、A、B、C、D ,C,D]。这是一个例子:

EVENT TIME
Task A 11/1/16 3:57
Task B 11/1/16 4:19
Task A 11/1/16 7:43
Task B 11/1/16 7:43
Task C 11/1/16 7:51
Task D 11/1/16 7:51
Task C 11/1/16 8:11
Task D 11/1/16 8:13
Task A 11/3/16 3:49
Task B 11/3/16 4:11
Task B 11/3/16 7:34
Task A 11/3/16 7:34
Task C 11/3/16 7:43
Task D 11/3/16 7:43
Task C 11/3/16 8:03
Task D 11/3/16 8:05
Task A 11/5/16 3:41
Task B 11/5/16 4:03
Task A 11/5/16 7:26
Task B 11/5/16 7:26
Task D 11/5/16 7:35
Task C 11/5/16 7:35
Task C 11/5/16 7:54
Task D 11/5/16 7:56

在这种情况下,正确的答案是一旦任务 A(循环的开始)已经开始,就删除“内部”/重叠的 ABCD。这导致 3 个时期:

Task A Task B Task C Task D
11/1/16 3:57 11/1/16 4:19 11/1/16 8:11 11/1/16 8:13
11/3/16 3:49 11/3/16 4:11 11/3/16 8:03 11/3/16 8:05
11/5/16 3:41 11/5/16 4:03 11/5/16 7:54 11/5/16 7:56

忽略(暂时)边缘情况,例如不完整的事件序列,是否有一种有效的算法来识别周期,同时合并重叠的内部周期?

只是@user3386109 提到的方法并跟踪事件时间戳。

将输入移动到名为 events.txt 的文件中。

file = open("events.txt", "r")
result = []
partial_result = {}
max_count =0;
tasks_count = [0,0,0,0]
for event in file:
    event = event.strip('\n')
    split_events = event.split()
    max_count = max(tasks_count)
    if len(split_events)==4: #Task data
        task_name = split_events[1]
        time =  split_events[2]+" "+split_events[3]
        idx = ord(task_name)-65
        curr_count = tasks_count[idx]
        if (curr_count==max_count or curr_count+1 == max_count) and task_name not in partial_result:
            partial_result[task_name] = time
        tasks_count[idx] +=1

    if len(partial_result)==4:
        result.append(partial_result)
        partial_result ={}
        tasks_count = [0,0,0,0]

print(result)

最终输出

[{'A': '11/1/16 3:57', 'B': '11/1/16 4:19', 'C': '11/1/16 8:11', 'D': '11/1/16 8:13'}, {'A': '11/3/16 3:49', 'B': '11/3/16 4:11', 'C': '11/3/16 8:03', 'D': '11/3/16 8:05'}, {'A': '11/5/16 3:41', 'B': '11/5/16 4:03', 'C': '11/5/16 7:54', 'D': '11/5/16 7:56'}]

您可以使用 collections.defaultdict:

import collections, datetime, re
r, d = [], collections.defaultdict(list)
data = [['Task A', '11/1/16 3:57'], ['Task B', '11/1/16 4:19'], ['Task A', '11/1/16 7:43'], ['Task B', '11/1/16 7:43'], ['Task C', '11/1/16 7:51'], ['Task D', '11/1/16 7:51'], ['Task C', '11/1/16 8:11'], ['Task D', '11/1/16 8:13'], ['Task A', '11/3/16 3:49'], ['Task B', '11/3/16 4:11'], ['Task B', '11/3/16 7:34'], ['Task A', '11/3/16 7:34'], ['Task C', '11/3/16 7:43'], ['Task D', '11/3/16 7:43'], ['Task C', '11/3/16 8:03'], ['Task D', '11/3/16 8:05'], ['Task A', '11/5/16 3:41'], ['Task B', '11/5/16 4:03'], ['Task A', '11/5/16 7:26'], ['Task B', '11/5/16 7:26'], ['Task D', '11/5/16 7:35'], ['Task C', '11/5/16 7:35'], ['Task C', '11/5/16 7:54'], ['Task D', '11/5/16 7:56']]
for a, b in data: 
   v = list(map(int, re.findall('\d+', b)))
   _date = datetime.datetime(v[2], v[0], v[1], v[-2], v[-1], 0)
   if (k:=a.split()[-1]) == 'A' and all(j in d for j in ['A', 'B', 'C', 'D']):
       r.append(d)
       d = collections.defaultdict(list)
       d[k].append(_date)
   else:
       d[k].append(_date)

r.append(d)

f, f1 = {'A':min, 'B':min, 'C':max, 'D':max}, lambda x:f'{x.month}/{x.day}/{x.year} {x.hour}:{str(x.minute).zfill(2)}'
result = [{a:f1(f[a](b)) for a, b in i.items()} for i in r]

输出:

[{'A': '11/1/16 3:57', 'B': '11/1/16 4:19', 'C': '11/1/16 8:11', 'D': '11/1/16 8:13'}, 
 {'A': '11/3/16 3:49', 'B': '11/3/16 4:11', 'C': '11/3/16 8:03', 'D': '11/3/16 8:05'}, 
 {'A': '11/5/16 3:41', 'B': '11/5/16 4:03', 'C': '11/5/16 7:54', 'D': '11/5/16 7:56'}]