随时间验证连续条件
Verify continuous condition with time
我想开发一个 python 程序,从某个时刻开始,等待 60 秒再执行一个动作。该程序必须具有的另一个功能是,如果我更新初始时间,它必须开始检查条件。我想过用线程来做,但我不知道如何停止线程并在新的开始时间重新启动它。
import thread
import time
# Define a function for the thread
def check_message (last, timer):
oldtime = time.time()
print oldtime
# check
while time.time() - oldtime <= 60:
print (time.time() - oldtime)
print "One minute"+ str(time.time())
return 1
# Create two threads as follows
try:
named_tuple = time.localtime() # get struct_time
time_string = time.strftime("%H:%M:%S", named_tuple)
thread.start_new_thread(check_message , (time_string, 60))
except:
print "Error: unable to start thread"
while 1:
pass
谢谢!
一个选择是在线程外进行检查,这样主循环每 60 秒执行一个线程来执行 X 作业:
import threading
import time
# Define a function for the thread
def check_message():
print("One minute"+ str(time.time()))
return 1
last_execution = time.time()
while 1:
if time.time() - last_execution < 60:
time.sleep(1)
else:
last_execution = time.time()
threading.Thread(target=check_message).start()
# Python2:
# import thread
# thread.start_new_thread(check_message)
我将代码替换为适用于 Python3 的语法,因为我没有安装 Python2。但是总体思路在两个版本中应该是一样的。
这里可能没有必要在循环中检查时间并且很浪费,因为您可以让线程休眠并让内核在时间到了时将其唤醒。
线程库为此类用例提供 threading.Timer
。你的问题是,你不能中断这样一个睡眠线程来调整执行指定函数的时间间隔。
我在下面的示例中使用自定义管理器-class TimeLord
来克服此限制。 TimeLord
通过取消当前计时器并用新计时器替换它来启用 "resetting" 计时器。
为此,TimeLord
包含一个包装中间工作函数和一个 "token" 属性,必须由 运行 计时器实例弹出以执行指定的目标函数。
由于 dict.pop()
是原子操作,因此这种设计保证了指定目标函数的唯一执行。 timelord.reset()
只要当前计时器还没有启动它的线程并弹出 _token
就有效。这种方法不能完全防止在尝试 "reset" 时新定时器线程可能无效的启动,但是当它发生时它是一种不重要的冗余,因为目标函数只能执行一次。
此代码使用 Python 2 和 3 运行:
import time
from datetime import datetime
from threading import Timer, current_thread
def f(x):
print('{} {}: RUNNING TARGET FUNCTION'.format(
datetime.now(), current_thread().name)
)
time.sleep(x)
print('{} {}: EXITING'.format(datetime.now(), current_thread().name))
class TimeLord:
"""
Manager Class for threading.Timer instance. Allows "resetting" `interval`
as long execution of `function` has not started by canceling the old
and constructing a new timer instance.
"""
def worker(self, *args, **kwargs):
try:
self.__dict__.pop("_token") # dict.pop() is atomic
except KeyError:
pass
else:
self.func(*args, **kwargs)
def __init__(self, interval, function, args=None, kwargs=None):
self.func = function
self.args = args if args is not None else []
self.kwargs = kwargs if kwargs is not None else {}
self._token = True
self._init_timer(interval)
def _init_timer(self, interval):
self._timer = Timer(interval, self.worker, self.args, self.kwargs)
self._timer.daemon = True
def start(self):
self._timer.start()
print('{} {}: STARTED with `interval={}`'.format(
datetime.now(), self._timer.name, self._timer.interval)
)
def reset(self, interval):
"""Cancel latest timer and start a new one if `_token` is still there.
"""
print('{} {}: CANCELED'.format(datetime.now(), self._timer.name))
self._timer.cancel()
# reduces, but doesn't prevent, occurrences when a new timer
# gets created which eventually will not succeed in popping
# the `_token`. That's uncritical redundancy when it happens.
# Only one thread ever will be able to execute `self.func()`
if hasattr(self, "_token"):
self._init_timer(interval)
self.start()
def cancel(self):
self._timer.cancel()
def join(self, timeout=None):
self._timer.join(timeout=timeout)
def run_demo(initial_interval):
print("*** testing with initial interval {} ***".format(initial_interval))
tl = TimeLord(interval=initial_interval, function=f, args=(10,))
tl.start()
print('*** {} sleeping two seconds ***'.format(datetime.now()))
time.sleep(2)
tl.reset(interval=6)
tl.reset(interval=7)
tl.join()
print("-" * 70)
if __name__ == '__main__':
run_demo(initial_interval=5)
run_demo(initial_interval=2)
示例输出:
*** testing with initial interval 5 ***
2019-06-05 20:58:23.448404 Thread-1: STARTED with `interval=5`
*** 2019-06-05 20:58:23.448428 sleeping two seconds ***
2019-06-05 20:58:25.450483 Thread-1: CANCELED
2019-06-05 20:58:25.450899 Thread-2: STARTED with `interval=6`
2019-06-05 20:58:25.450955 Thread-2: CANCELED
2019-06-05 20:58:25.451496 Thread-3: STARTED with `interval=7`
2019-06-05 20:58:32.451592 Thread-3: RUNNING TARGET FUNCTION
2019-06-05 20:58:42.457527 Thread-3: EXITING
----------------------------------------------------------------------
*** testing with initial interval 2 ***
2019-06-05 20:58:42.457986 Thread-4: STARTED with `interval=2`
*** 2019-06-05 20:58:42.458033 sleeping two seconds ***
2019-06-05 20:58:44.458058 Thread-4: RUNNING TARGET FUNCTION
2019-06-05 20:58:44.459649 Thread-4: CANCELED
2019-06-05 20:58:44.459724 Thread-4: CANCELED
2019-06-05 20:58:54.466342 Thread-4: EXITING
----------------------------------------------------------------------
Process finished with exit code 0
请注意,在 interval=2 的情况下,两秒后的取消无效,因为计时器已经在执行目标函数。
我想开发一个 python 程序,从某个时刻开始,等待 60 秒再执行一个动作。该程序必须具有的另一个功能是,如果我更新初始时间,它必须开始检查条件。我想过用线程来做,但我不知道如何停止线程并在新的开始时间重新启动它。
import thread
import time
# Define a function for the thread
def check_message (last, timer):
oldtime = time.time()
print oldtime
# check
while time.time() - oldtime <= 60:
print (time.time() - oldtime)
print "One minute"+ str(time.time())
return 1
# Create two threads as follows
try:
named_tuple = time.localtime() # get struct_time
time_string = time.strftime("%H:%M:%S", named_tuple)
thread.start_new_thread(check_message , (time_string, 60))
except:
print "Error: unable to start thread"
while 1:
pass
谢谢!
一个选择是在线程外进行检查,这样主循环每 60 秒执行一个线程来执行 X 作业:
import threading
import time
# Define a function for the thread
def check_message():
print("One minute"+ str(time.time()))
return 1
last_execution = time.time()
while 1:
if time.time() - last_execution < 60:
time.sleep(1)
else:
last_execution = time.time()
threading.Thread(target=check_message).start()
# Python2:
# import thread
# thread.start_new_thread(check_message)
我将代码替换为适用于 Python3 的语法,因为我没有安装 Python2。但是总体思路在两个版本中应该是一样的。
这里可能没有必要在循环中检查时间并且很浪费,因为您可以让线程休眠并让内核在时间到了时将其唤醒。
线程库为此类用例提供 threading.Timer
。你的问题是,你不能中断这样一个睡眠线程来调整执行指定函数的时间间隔。
我在下面的示例中使用自定义管理器-class TimeLord
来克服此限制。 TimeLord
通过取消当前计时器并用新计时器替换它来启用 "resetting" 计时器。
为此,TimeLord
包含一个包装中间工作函数和一个 "token" 属性,必须由 运行 计时器实例弹出以执行指定的目标函数。
由于 dict.pop()
是原子操作,因此这种设计保证了指定目标函数的唯一执行。 timelord.reset()
只要当前计时器还没有启动它的线程并弹出 _token
就有效。这种方法不能完全防止在尝试 "reset" 时新定时器线程可能无效的启动,但是当它发生时它是一种不重要的冗余,因为目标函数只能执行一次。
此代码使用 Python 2 和 3 运行:
import time
from datetime import datetime
from threading import Timer, current_thread
def f(x):
print('{} {}: RUNNING TARGET FUNCTION'.format(
datetime.now(), current_thread().name)
)
time.sleep(x)
print('{} {}: EXITING'.format(datetime.now(), current_thread().name))
class TimeLord:
"""
Manager Class for threading.Timer instance. Allows "resetting" `interval`
as long execution of `function` has not started by canceling the old
and constructing a new timer instance.
"""
def worker(self, *args, **kwargs):
try:
self.__dict__.pop("_token") # dict.pop() is atomic
except KeyError:
pass
else:
self.func(*args, **kwargs)
def __init__(self, interval, function, args=None, kwargs=None):
self.func = function
self.args = args if args is not None else []
self.kwargs = kwargs if kwargs is not None else {}
self._token = True
self._init_timer(interval)
def _init_timer(self, interval):
self._timer = Timer(interval, self.worker, self.args, self.kwargs)
self._timer.daemon = True
def start(self):
self._timer.start()
print('{} {}: STARTED with `interval={}`'.format(
datetime.now(), self._timer.name, self._timer.interval)
)
def reset(self, interval):
"""Cancel latest timer and start a new one if `_token` is still there.
"""
print('{} {}: CANCELED'.format(datetime.now(), self._timer.name))
self._timer.cancel()
# reduces, but doesn't prevent, occurrences when a new timer
# gets created which eventually will not succeed in popping
# the `_token`. That's uncritical redundancy when it happens.
# Only one thread ever will be able to execute `self.func()`
if hasattr(self, "_token"):
self._init_timer(interval)
self.start()
def cancel(self):
self._timer.cancel()
def join(self, timeout=None):
self._timer.join(timeout=timeout)
def run_demo(initial_interval):
print("*** testing with initial interval {} ***".format(initial_interval))
tl = TimeLord(interval=initial_interval, function=f, args=(10,))
tl.start()
print('*** {} sleeping two seconds ***'.format(datetime.now()))
time.sleep(2)
tl.reset(interval=6)
tl.reset(interval=7)
tl.join()
print("-" * 70)
if __name__ == '__main__':
run_demo(initial_interval=5)
run_demo(initial_interval=2)
示例输出:
*** testing with initial interval 5 ***
2019-06-05 20:58:23.448404 Thread-1: STARTED with `interval=5`
*** 2019-06-05 20:58:23.448428 sleeping two seconds ***
2019-06-05 20:58:25.450483 Thread-1: CANCELED
2019-06-05 20:58:25.450899 Thread-2: STARTED with `interval=6`
2019-06-05 20:58:25.450955 Thread-2: CANCELED
2019-06-05 20:58:25.451496 Thread-3: STARTED with `interval=7`
2019-06-05 20:58:32.451592 Thread-3: RUNNING TARGET FUNCTION
2019-06-05 20:58:42.457527 Thread-3: EXITING
----------------------------------------------------------------------
*** testing with initial interval 2 ***
2019-06-05 20:58:42.457986 Thread-4: STARTED with `interval=2`
*** 2019-06-05 20:58:42.458033 sleeping two seconds ***
2019-06-05 20:58:44.458058 Thread-4: RUNNING TARGET FUNCTION
2019-06-05 20:58:44.459649 Thread-4: CANCELED
2019-06-05 20:58:44.459724 Thread-4: CANCELED
2019-06-05 20:58:54.466342 Thread-4: EXITING
----------------------------------------------------------------------
Process finished with exit code 0
请注意,在 interval=2 的情况下,两秒后的取消无效,因为计时器已经在执行目标函数。