如何在 python 中创建基于间隔函数调用的后台线程?
How to create a background threaded on interval function call in python?
我正在尝试实现在后台运行的检测信号调用。我如何创建一个线程间隔调用,比如每 30 秒调用一次以下函数:
self.mqConn.heartbeat_tick()
另外,我该如何停止这个话题?
非常感谢。
一种方法是像这样使用 circuits 应用程序框架:
from circuits import Component, Event, Timer
class App(Component):
def init(self, mqConn):
self.mqConn = mqConn
Timer(30, Event.create("heartbeat"), persist=True).register(self)
def heartbeat(self):
self.mqConn.heartbeat_tick()
App().run()
注:我是电路作者:)
这只是一个基本的想法和结构 -- 您需要对其进行调整以适合您的具体应用和要求!
使用包含循环的线程
from threading import Thread
import time
def background_task():
while not background_task.cancelled:
self.mqConn.heartbeat_tick()
time.sleep(30)
background_task.cancelled = False
t = Thread(target=background_task)
t.start()
background_task.cancelled = True
或者,您可以将计时器子类化,以便于取消:
from threading import Timer
class RepeatingTimer(Timer):
def run(self):
while not self.finished.is_set():
self.function(*self.args, **self.kwargs)
self.finished.wait(self.interval)
t = RepeatingTimer(30.0, self.mqConn.heartbeat_tick)
t.start() # every 30 seconds, call heartbeat_tick
# later
t.cancel() # cancels execution
或者您可以在线程模块中使用计时器 class:
from threading import Timer
def hello():
print "hello, world"
t = Timer(30.0, hello)
t.start() # after 30 seconds, "hello, world" will be printed
t.cancel() # cancels execution, this only works before the 30 seconds is elapsed
这不会每 x 秒启动一次,而是将线程延迟 x 秒执行。但是你仍然可以把它放在一个循环中并使用 t.is_alive() 来查看它的状态。
Eric's answer: you can't subclass Timer
in python 2, since it's actually a light function wrapper around the true class: _Timer
. If you do you'll get the issue that pops up in this post 的快速跟进。
改用 _Timer
修复它:
from threading import _Timer
class RepeatingTimer(_Timer):
def run(self):
while not self.finished.is_set():
self.function(*self.args, **self.kwargs)
self.finished.wait(self.interval)
t = RepeatingTimer(30.0, self.mqConn.heartbeat_tick)
t.start() # every 30 seconds, call heartbeat_tick
# later
t.cancel() # cancels execution
我正在尝试实现在后台运行的检测信号调用。我如何创建一个线程间隔调用,比如每 30 秒调用一次以下函数:
self.mqConn.heartbeat_tick()
另外,我该如何停止这个话题?
非常感谢。
一种方法是像这样使用 circuits 应用程序框架:
from circuits import Component, Event, Timer
class App(Component):
def init(self, mqConn):
self.mqConn = mqConn
Timer(30, Event.create("heartbeat"), persist=True).register(self)
def heartbeat(self):
self.mqConn.heartbeat_tick()
App().run()
注:我是电路作者:)
这只是一个基本的想法和结构 -- 您需要对其进行调整以适合您的具体应用和要求!
使用包含循环的线程
from threading import Thread
import time
def background_task():
while not background_task.cancelled:
self.mqConn.heartbeat_tick()
time.sleep(30)
background_task.cancelled = False
t = Thread(target=background_task)
t.start()
background_task.cancelled = True
或者,您可以将计时器子类化,以便于取消:
from threading import Timer
class RepeatingTimer(Timer):
def run(self):
while not self.finished.is_set():
self.function(*self.args, **self.kwargs)
self.finished.wait(self.interval)
t = RepeatingTimer(30.0, self.mqConn.heartbeat_tick)
t.start() # every 30 seconds, call heartbeat_tick
# later
t.cancel() # cancels execution
或者您可以在线程模块中使用计时器 class:
from threading import Timer
def hello():
print "hello, world"
t = Timer(30.0, hello)
t.start() # after 30 seconds, "hello, world" will be printed
t.cancel() # cancels execution, this only works before the 30 seconds is elapsed
这不会每 x 秒启动一次,而是将线程延迟 x 秒执行。但是你仍然可以把它放在一个循环中并使用 t.is_alive() 来查看它的状态。
Eric's answer: you can't subclass Timer
in python 2, since it's actually a light function wrapper around the true class: _Timer
. If you do you'll get the issue that pops up in this post 的快速跟进。
改用 _Timer
修复它:
from threading import _Timer
class RepeatingTimer(_Timer):
def run(self):
while not self.finished.is_set():
self.function(*self.args, **self.kwargs)
self.finished.wait(self.interval)
t = RepeatingTimer(30.0, self.mqConn.heartbeat_tick)
t.start() # every 30 seconds, call heartbeat_tick
# later
t.cancel() # cancels execution