如何识别具有重叠周期的时间序列数据中的周期?
How do I identify cycles in time series data with overlapping periods?
给定具有 4 个任务类别(A、B、C、D)及其相应时间戳的时间序列数据,我的任务是识别 intervals/cycles 个 [(A,B,C,D)_1 , (A,B,C,D)_2, ...]
这会很简单(例如,哈希映射或链表),事件干净、不重叠,但我的数据包含序列(按时间排序),例如 [A、B、A、B、C、D ,C,D]。这是一个例子:
EVENT
TIME
Task A
11/1/16 3:57
Task B
11/1/16 4:19
Task A
11/1/16 7:43
Task B
11/1/16 7:43
Task C
11/1/16 7:51
Task D
11/1/16 7:51
Task C
11/1/16 8:11
Task D
11/1/16 8:13
Task A
11/3/16 3:49
Task B
11/3/16 4:11
Task B
11/3/16 7:34
Task A
11/3/16 7:34
Task C
11/3/16 7:43
Task D
11/3/16 7:43
Task C
11/3/16 8:03
Task D
11/3/16 8:05
Task A
11/5/16 3:41
Task B
11/5/16 4:03
Task A
11/5/16 7:26
Task B
11/5/16 7:26
Task D
11/5/16 7:35
Task C
11/5/16 7:35
Task C
11/5/16 7:54
Task D
11/5/16 7:56
在这种情况下,正确的答案是一旦任务 A(循环的开始)已经开始,就删除“内部”/重叠的 ABCD。这导致 3 个时期:
Task A
Task B
Task C
Task D
11/1/16 3:57
11/1/16 4:19
11/1/16 8:11
11/1/16 8:13
11/3/16 3:49
11/3/16 4:11
11/3/16 8:03
11/3/16 8:05
11/5/16 3:41
11/5/16 4:03
11/5/16 7:54
11/5/16 7:56
忽略(暂时)边缘情况,例如不完整的事件序列,是否有一种有效的算法来识别周期,同时合并重叠的内部周期?
只是@user3386109 提到的方法并跟踪事件时间戳。
将输入移动到名为 events.txt
的文件中。
file = open("events.txt", "r")
result = []
partial_result = {}
max_count =0;
tasks_count = [0,0,0,0]
for event in file:
event = event.strip('\n')
split_events = event.split()
max_count = max(tasks_count)
if len(split_events)==4: #Task data
task_name = split_events[1]
time = split_events[2]+" "+split_events[3]
idx = ord(task_name)-65
curr_count = tasks_count[idx]
if (curr_count==max_count or curr_count+1 == max_count) and task_name not in partial_result:
partial_result[task_name] = time
tasks_count[idx] +=1
if len(partial_result)==4:
result.append(partial_result)
partial_result ={}
tasks_count = [0,0,0,0]
print(result)
最终输出
[{'A': '11/1/16 3:57', 'B': '11/1/16 4:19', 'C': '11/1/16 8:11', 'D': '11/1/16 8:13'}, {'A': '11/3/16 3:49', 'B': '11/3/16 4:11', 'C': '11/3/16 8:03', 'D': '11/3/16 8:05'}, {'A': '11/5/16 3:41', 'B': '11/5/16 4:03', 'C': '11/5/16 7:54', 'D': '11/5/16 7:56'}]
您可以使用 collections.defaultdict
:
import collections, datetime, re
r, d = [], collections.defaultdict(list)
data = [['Task A', '11/1/16 3:57'], ['Task B', '11/1/16 4:19'], ['Task A', '11/1/16 7:43'], ['Task B', '11/1/16 7:43'], ['Task C', '11/1/16 7:51'], ['Task D', '11/1/16 7:51'], ['Task C', '11/1/16 8:11'], ['Task D', '11/1/16 8:13'], ['Task A', '11/3/16 3:49'], ['Task B', '11/3/16 4:11'], ['Task B', '11/3/16 7:34'], ['Task A', '11/3/16 7:34'], ['Task C', '11/3/16 7:43'], ['Task D', '11/3/16 7:43'], ['Task C', '11/3/16 8:03'], ['Task D', '11/3/16 8:05'], ['Task A', '11/5/16 3:41'], ['Task B', '11/5/16 4:03'], ['Task A', '11/5/16 7:26'], ['Task B', '11/5/16 7:26'], ['Task D', '11/5/16 7:35'], ['Task C', '11/5/16 7:35'], ['Task C', '11/5/16 7:54'], ['Task D', '11/5/16 7:56']]
for a, b in data:
v = list(map(int, re.findall('\d+', b)))
_date = datetime.datetime(v[2], v[0], v[1], v[-2], v[-1], 0)
if (k:=a.split()[-1]) == 'A' and all(j in d for j in ['A', 'B', 'C', 'D']):
r.append(d)
d = collections.defaultdict(list)
d[k].append(_date)
else:
d[k].append(_date)
r.append(d)
f, f1 = {'A':min, 'B':min, 'C':max, 'D':max}, lambda x:f'{x.month}/{x.day}/{x.year} {x.hour}:{str(x.minute).zfill(2)}'
result = [{a:f1(f[a](b)) for a, b in i.items()} for i in r]
输出:
[{'A': '11/1/16 3:57', 'B': '11/1/16 4:19', 'C': '11/1/16 8:11', 'D': '11/1/16 8:13'},
{'A': '11/3/16 3:49', 'B': '11/3/16 4:11', 'C': '11/3/16 8:03', 'D': '11/3/16 8:05'},
{'A': '11/5/16 3:41', 'B': '11/5/16 4:03', 'C': '11/5/16 7:54', 'D': '11/5/16 7:56'}]
给定具有 4 个任务类别(A、B、C、D)及其相应时间戳的时间序列数据,我的任务是识别 intervals/cycles 个 [(A,B,C,D)_1 , (A,B,C,D)_2, ...]
这会很简单(例如,哈希映射或链表),事件干净、不重叠,但我的数据包含序列(按时间排序),例如 [A、B、A、B、C、D ,C,D]。这是一个例子:
EVENT | TIME |
---|---|
Task A | 11/1/16 3:57 |
Task B | 11/1/16 4:19 |
Task A | 11/1/16 7:43 |
Task B | 11/1/16 7:43 |
Task C | 11/1/16 7:51 |
Task D | 11/1/16 7:51 |
Task C | 11/1/16 8:11 |
Task D | 11/1/16 8:13 |
Task A | 11/3/16 3:49 |
Task B | 11/3/16 4:11 |
Task B | 11/3/16 7:34 |
Task A | 11/3/16 7:34 |
Task C | 11/3/16 7:43 |
Task D | 11/3/16 7:43 |
Task C | 11/3/16 8:03 |
Task D | 11/3/16 8:05 |
Task A | 11/5/16 3:41 |
Task B | 11/5/16 4:03 |
Task A | 11/5/16 7:26 |
Task B | 11/5/16 7:26 |
Task D | 11/5/16 7:35 |
Task C | 11/5/16 7:35 |
Task C | 11/5/16 7:54 |
Task D | 11/5/16 7:56 |
在这种情况下,正确的答案是一旦任务 A(循环的开始)已经开始,就删除“内部”/重叠的 ABCD。这导致 3 个时期:
Task A | Task B | Task C | Task D |
---|---|---|---|
11/1/16 3:57 | 11/1/16 4:19 | 11/1/16 8:11 | 11/1/16 8:13 |
11/3/16 3:49 | 11/3/16 4:11 | 11/3/16 8:03 | 11/3/16 8:05 |
11/5/16 3:41 | 11/5/16 4:03 | 11/5/16 7:54 | 11/5/16 7:56 |
忽略(暂时)边缘情况,例如不完整的事件序列,是否有一种有效的算法来识别周期,同时合并重叠的内部周期?
只是@user3386109 提到的方法并跟踪事件时间戳。
将输入移动到名为 events.txt
的文件中。
file = open("events.txt", "r")
result = []
partial_result = {}
max_count =0;
tasks_count = [0,0,0,0]
for event in file:
event = event.strip('\n')
split_events = event.split()
max_count = max(tasks_count)
if len(split_events)==4: #Task data
task_name = split_events[1]
time = split_events[2]+" "+split_events[3]
idx = ord(task_name)-65
curr_count = tasks_count[idx]
if (curr_count==max_count or curr_count+1 == max_count) and task_name not in partial_result:
partial_result[task_name] = time
tasks_count[idx] +=1
if len(partial_result)==4:
result.append(partial_result)
partial_result ={}
tasks_count = [0,0,0,0]
print(result)
最终输出
[{'A': '11/1/16 3:57', 'B': '11/1/16 4:19', 'C': '11/1/16 8:11', 'D': '11/1/16 8:13'}, {'A': '11/3/16 3:49', 'B': '11/3/16 4:11', 'C': '11/3/16 8:03', 'D': '11/3/16 8:05'}, {'A': '11/5/16 3:41', 'B': '11/5/16 4:03', 'C': '11/5/16 7:54', 'D': '11/5/16 7:56'}]
您可以使用 collections.defaultdict
:
import collections, datetime, re
r, d = [], collections.defaultdict(list)
data = [['Task A', '11/1/16 3:57'], ['Task B', '11/1/16 4:19'], ['Task A', '11/1/16 7:43'], ['Task B', '11/1/16 7:43'], ['Task C', '11/1/16 7:51'], ['Task D', '11/1/16 7:51'], ['Task C', '11/1/16 8:11'], ['Task D', '11/1/16 8:13'], ['Task A', '11/3/16 3:49'], ['Task B', '11/3/16 4:11'], ['Task B', '11/3/16 7:34'], ['Task A', '11/3/16 7:34'], ['Task C', '11/3/16 7:43'], ['Task D', '11/3/16 7:43'], ['Task C', '11/3/16 8:03'], ['Task D', '11/3/16 8:05'], ['Task A', '11/5/16 3:41'], ['Task B', '11/5/16 4:03'], ['Task A', '11/5/16 7:26'], ['Task B', '11/5/16 7:26'], ['Task D', '11/5/16 7:35'], ['Task C', '11/5/16 7:35'], ['Task C', '11/5/16 7:54'], ['Task D', '11/5/16 7:56']]
for a, b in data:
v = list(map(int, re.findall('\d+', b)))
_date = datetime.datetime(v[2], v[0], v[1], v[-2], v[-1], 0)
if (k:=a.split()[-1]) == 'A' and all(j in d for j in ['A', 'B', 'C', 'D']):
r.append(d)
d = collections.defaultdict(list)
d[k].append(_date)
else:
d[k].append(_date)
r.append(d)
f, f1 = {'A':min, 'B':min, 'C':max, 'D':max}, lambda x:f'{x.month}/{x.day}/{x.year} {x.hour}:{str(x.minute).zfill(2)}'
result = [{a:f1(f[a](b)) for a, b in i.items()} for i in r]
输出:
[{'A': '11/1/16 3:57', 'B': '11/1/16 4:19', 'C': '11/1/16 8:11', 'D': '11/1/16 8:13'},
{'A': '11/3/16 3:49', 'B': '11/3/16 4:11', 'C': '11/3/16 8:03', 'D': '11/3/16 8:05'},
{'A': '11/5/16 3:41', 'B': '11/5/16 4:03', 'C': '11/5/16 7:54', 'D': '11/5/16 7:56'}]