python 时间滑动 window 变化
python time sliding window variation
我遇到了滑动 window 问题的变体!
通常我们设置要滑动的元素数量,但在我的例子中,我想滑动时间!
我想达到的目标是一个函数(在本例中是线程)
能够在几秒钟内创建一个 "time" windows(由用户提供)。
在这种情况下从队列的第一个元素开始:
[datetime.time(7, 6, 14, 537370), 584 加 5 秒 -> 7:6:19.537370(终点)求和这个区间内的所有元素:
[datetime.time(7, 6, 14, 537370), 584]
[datetime.time(7, 6, 18, 542798), 761]
总计:584+761=1345
然后用第二个元素创建另一个 "windows" 并继续。
重要提示:一项可以是多项 window 的一部分。该项目是同时生成的,具有休眠 n 秒然后刷新队列功能的 naif 解决方案对我的问题不利。
我认为它是 post 的变体:
Flexible sliding window (in Python)
但是还是不能解决问题!任何帮助或建议将不胜感激。
谢谢!
元素列表示例:
[datetime.time(7, 6, 14, 537370), 584]
[datetime.time(7, 6, 18, 542798), 761]
[datetime.time(7, 6, 20, 546007), 848]
[datetime.time(7, 6, 24, 550969), 20]
[datetime.time(7, 6, 27, 554370), 478]
[datetime.time(7, 6, 27, 554628), 12]
[datetime.time(7, 6, 31, 558919), 29]
[datetime.time(7, 6, 31, 559562), 227]
[datetime.time(7, 6, 32, 560863), 379]
[datetime.time(7, 6, 35, 564863), 132]
[datetime.time(7, 6, 37, 567276), 651]
[datetime.time(7, 6, 38, 568652), 68]
[datetime.time(7, 6, 40, 569861), 100]
[datetime.time(7, 6, 41, 571459), 722]
[datetime.time(7, 6, 44, 574802), 560]
...
代码:
import random
import time
import threading
import datetime
from multiprocessing import Queue
q = Queue()
#this is a producer that put elements in queue
def t1():
element = [0,0]
while True:
time.sleep(random.randint(0, 5))
element[0] = datetime.datetime.now().time()
element[1] = random.randint(0, 1000)
q.put(element)
#this is a consumer that sum elements inside a window of n seconds
#Ineed something a sliding window time of ten seconds that sum all elements for n seconds
def t2():
windowsize = 5 #size of the window 5 seconds
while not queue.empty():
e = q.get()
start = e[0] #the first element is the beginning point
end = start + datetime.timedelta(seconds=windowsize) #ending point
sum += e[1]
#some code that solve the problem :)
a = threading.Thread(target=t1)
a.start()
b = threading.Thread(target=t2)
b.start()
while True:
time.sleep(1)
这样可以吗?这就是我理解你的问题的方式。它的作用是创建一个 class 来跟踪事物。您可以通过 tw.insert() 添加到此或与 tw.sum_window(seconds) 求和。
初始化TimeWindow时,可以给它一个max size参数,默认是10秒。当您添加元素或计算总和时,它会进行清理,以便在每次插入或求和操作之前,第一个元素时间 e[0][0] 和最后一个元素时间 e[n][0] 彼此相差 10 秒以内.旧的条目被删除。那里有一个 "poller" 线程来跟踪您的请求。
我添加了两个队列,因为我不知道您打算如何处理结果。现在,如果你想请求从现在开始到未来 5 秒的数据,你可以创建一个请求并将其放入队列中。该请求有一个随机 ID,以便您可以将其与结果相匹配。您的主线程需要监视结果队列,五秒钟后,每个请求都会以相同的 ID 和总和发送到队列 return。
如果这不是你想要做的,那么我就是不明白你在这里试图达到什么目的。即使这已经相当复杂,并且可能有一种更简单的方法来实现您打算做的事情。
import random
import time
import threading
import datetime
import Queue
import uuid
from collections import deque
q_lock = threading.RLock()
class TimeWindow(object):
def __init__(self, max_size=10):
self.max_size = max_size
self.q = deque()
def expire(self):
time_now = datetime.datetime.now()
while True:
try:
oldest_element = self.q.popleft()
oe_time = oldest_element[0]
if oe_time + datetime.timedelta(seconds=self.max_size) > time_now:
self.q.appendleft(oldest_element)
break
except IndexError:
break
def insert(self,elm):
self.expire()
self.q.append(elm)
def sum_window(self, start, end):
self.expire()
try:
_ = self.q[0]
except IndexError:
return 0
result=0
for f in self.q:
if start < f[0] < end:
result += f[1]
else:
pass
return result
tw = TimeWindow()
def t1():
while True:
time.sleep(random.randint(0, 3))
element = [datetime.datetime.now(), random.randint(0,1000)]
with q_lock:
tw.insert(element)
def poller(in_q, out_q):
pending = []
while True:
try:
new_request = in_q.get(0.1)
new_request["end"] = new_request["start"] + datetime.timedelta(seconds=new_request["frame"])
pending.append(new_request)
except Queue.Empty:
pass
new_pending = []
for a in pending:
if a["end"] < datetime.datetime.now():
with q_lock:
r_sum = tw.sum_window(a["start"], a["end"])
r_structure = {"id": a["id"], "result": r_sum}
out_q.put(r_structure)
else:
new_pending.append(a)
pending = new_pending
a = threading.Thread(target=t1)
a.daemon = True
a.start()
in_queue = Queue.Queue()
result_queue = Queue.Queue()
po = threading.Thread(target=poller, args=(in_queue, result_queue,))
po.daemon = True
po.start()
while True:
time.sleep(1)
newr = {"id": uuid.uuid4(), "frame": 5, "start": datetime.datetime.now()}
in_queue.put(newr)
try:
ready = result_queue.get(0)
print ready
except Queue.Empty:
pass
garim@wof:~$ python solution.py
1 t1 produce element: 16:09:30.472497 1
2 t1 produce element: 16:09:33.475714 9
3 t1 produce element: 16:09:34.476922 10
4 t1 produce element: 16:09:37.480100 7
solution: 16:09:37.481171 {'id': UUID('adff334f-a97a-459d-8dcc-f28309e25574'), 'result': 19}
5 t1 produce element: 16:09:38.481352 10
solution: 16:09:38.482687 {'id': UUID('0a7481e5-e993-439a-9f7e-2c5aeef86155'), 'result': 19}
它仍然有效 :( 我为它使用函数 t1 插入的每个元素添加了一个计数器。目标是此时求和 (result_queue.get):
16:09:35.472497 ---> 16:09:30.472497 + 5 秒
之前没有。只有这样元素才会熄灭。下次汇总时间:
16:09:35.475714 ---> 16:09:33.475714 + 5 秒
我知道这很难解释.. 有了你的两个解决方案,时间 window 幻灯片所以我可以认为问题已经解决了:)我会尝试改进函数 sum 的执行时间,即时间触发很重要。我学到了很多有用的知识。感谢您的帮助。
我遇到了滑动 window 问题的变体!
通常我们设置要滑动的元素数量,但在我的例子中,我想滑动时间!
我想达到的目标是一个函数(在本例中是线程) 能够在几秒钟内创建一个 "time" windows(由用户提供)。
在这种情况下从队列的第一个元素开始:
[datetime.time(7, 6, 14, 537370), 584 加 5 秒 -> 7:6:19.537370(终点)求和这个区间内的所有元素:
[datetime.time(7, 6, 14, 537370), 584]
[datetime.time(7, 6, 18, 542798), 761]
总计:584+761=1345
然后用第二个元素创建另一个 "windows" 并继续。 重要提示:一项可以是多项 window 的一部分。该项目是同时生成的,具有休眠 n 秒然后刷新队列功能的 naif 解决方案对我的问题不利。
我认为它是 post 的变体: Flexible sliding window (in Python)
但是还是不能解决问题!任何帮助或建议将不胜感激。 谢谢!
元素列表示例:
[datetime.time(7, 6, 14, 537370), 584]
[datetime.time(7, 6, 18, 542798), 761]
[datetime.time(7, 6, 20, 546007), 848]
[datetime.time(7, 6, 24, 550969), 20]
[datetime.time(7, 6, 27, 554370), 478]
[datetime.time(7, 6, 27, 554628), 12]
[datetime.time(7, 6, 31, 558919), 29]
[datetime.time(7, 6, 31, 559562), 227]
[datetime.time(7, 6, 32, 560863), 379]
[datetime.time(7, 6, 35, 564863), 132]
[datetime.time(7, 6, 37, 567276), 651]
[datetime.time(7, 6, 38, 568652), 68]
[datetime.time(7, 6, 40, 569861), 100]
[datetime.time(7, 6, 41, 571459), 722]
[datetime.time(7, 6, 44, 574802), 560]
...
代码:
import random
import time
import threading
import datetime
from multiprocessing import Queue
q = Queue()
#this is a producer that put elements in queue
def t1():
element = [0,0]
while True:
time.sleep(random.randint(0, 5))
element[0] = datetime.datetime.now().time()
element[1] = random.randint(0, 1000)
q.put(element)
#this is a consumer that sum elements inside a window of n seconds
#Ineed something a sliding window time of ten seconds that sum all elements for n seconds
def t2():
windowsize = 5 #size of the window 5 seconds
while not queue.empty():
e = q.get()
start = e[0] #the first element is the beginning point
end = start + datetime.timedelta(seconds=windowsize) #ending point
sum += e[1]
#some code that solve the problem :)
a = threading.Thread(target=t1)
a.start()
b = threading.Thread(target=t2)
b.start()
while True:
time.sleep(1)
这样可以吗?这就是我理解你的问题的方式。它的作用是创建一个 class 来跟踪事物。您可以通过 tw.insert() 添加到此或与 tw.sum_window(seconds) 求和。
初始化TimeWindow时,可以给它一个max size参数,默认是10秒。当您添加元素或计算总和时,它会进行清理,以便在每次插入或求和操作之前,第一个元素时间 e[0][0] 和最后一个元素时间 e[n][0] 彼此相差 10 秒以内.旧的条目被删除。那里有一个 "poller" 线程来跟踪您的请求。
我添加了两个队列,因为我不知道您打算如何处理结果。现在,如果你想请求从现在开始到未来 5 秒的数据,你可以创建一个请求并将其放入队列中。该请求有一个随机 ID,以便您可以将其与结果相匹配。您的主线程需要监视结果队列,五秒钟后,每个请求都会以相同的 ID 和总和发送到队列 return。
如果这不是你想要做的,那么我就是不明白你在这里试图达到什么目的。即使这已经相当复杂,并且可能有一种更简单的方法来实现您打算做的事情。
import random
import time
import threading
import datetime
import Queue
import uuid
from collections import deque
q_lock = threading.RLock()
class TimeWindow(object):
def __init__(self, max_size=10):
self.max_size = max_size
self.q = deque()
def expire(self):
time_now = datetime.datetime.now()
while True:
try:
oldest_element = self.q.popleft()
oe_time = oldest_element[0]
if oe_time + datetime.timedelta(seconds=self.max_size) > time_now:
self.q.appendleft(oldest_element)
break
except IndexError:
break
def insert(self,elm):
self.expire()
self.q.append(elm)
def sum_window(self, start, end):
self.expire()
try:
_ = self.q[0]
except IndexError:
return 0
result=0
for f in self.q:
if start < f[0] < end:
result += f[1]
else:
pass
return result
tw = TimeWindow()
def t1():
while True:
time.sleep(random.randint(0, 3))
element = [datetime.datetime.now(), random.randint(0,1000)]
with q_lock:
tw.insert(element)
def poller(in_q, out_q):
pending = []
while True:
try:
new_request = in_q.get(0.1)
new_request["end"] = new_request["start"] + datetime.timedelta(seconds=new_request["frame"])
pending.append(new_request)
except Queue.Empty:
pass
new_pending = []
for a in pending:
if a["end"] < datetime.datetime.now():
with q_lock:
r_sum = tw.sum_window(a["start"], a["end"])
r_structure = {"id": a["id"], "result": r_sum}
out_q.put(r_structure)
else:
new_pending.append(a)
pending = new_pending
a = threading.Thread(target=t1)
a.daemon = True
a.start()
in_queue = Queue.Queue()
result_queue = Queue.Queue()
po = threading.Thread(target=poller, args=(in_queue, result_queue,))
po.daemon = True
po.start()
while True:
time.sleep(1)
newr = {"id": uuid.uuid4(), "frame": 5, "start": datetime.datetime.now()}
in_queue.put(newr)
try:
ready = result_queue.get(0)
print ready
except Queue.Empty:
pass
garim@wof:~$ python solution.py
1 t1 produce element: 16:09:30.472497 1
2 t1 produce element: 16:09:33.475714 9
3 t1 produce element: 16:09:34.476922 10
4 t1 produce element: 16:09:37.480100 7
solution: 16:09:37.481171 {'id': UUID('adff334f-a97a-459d-8dcc-f28309e25574'), 'result': 19}
5 t1 produce element: 16:09:38.481352 10
solution: 16:09:38.482687 {'id': UUID('0a7481e5-e993-439a-9f7e-2c5aeef86155'), 'result': 19}
它仍然有效 :( 我为它使用函数 t1 插入的每个元素添加了一个计数器。目标是此时求和 (result_queue.get):
16:09:35.472497 ---> 16:09:30.472497 + 5 秒
之前没有。只有这样元素才会熄灭。下次汇总时间:
16:09:35.475714 ---> 16:09:33.475714 + 5 秒
我知道这很难解释.. 有了你的两个解决方案,时间 window 幻灯片所以我可以认为问题已经解决了:)我会尝试改进函数 sum 的执行时间,即时间触发很重要。我学到了很多有用的知识。感谢您的帮助。