RQ 超时不会终止多线程作业
RQ Timeout does not kill multi-threaded jobs
我在使用 python RQ(在 v0.5.6 和 v0.6.0 上测试)运行宁多线程任务时遇到问题。
考虑以下代码,作为我要实现的目标的简化版本:
thing.py
from threading import Thread
class MyThing(object):
def say_hello(self):
while True:
print "Hello World"
def hello_task(self):
t = Thread(target=self.say_hello)
t.daemon = True # seems like it makes no difference
t.start()
t.join()
main.py
from rq import Queue
from redis import Redis
from thing import MyThing
conn = Redis()
q = Queue(connection=conn)
q.enqueue(MyThing().say_hello, timeout=5)
当执行 main.py
时(当 rqworker 在后台 运行ning 时),作业按预期在 5 秒内超时中断。
问题是,当我设置一个包含 thread/s 的任务(例如 MyThing().hello_task
)时,线程永远 运行 并且在 5 秒超时结束时没有任何反应。
我如何运行一个带有RQ的多线程任务,这样超时就会杀死任务,它的儿子,孙子和他们的妻子?
当你 运行 t.join()
时,hello_task
线程阻塞并等待 say_hello
线程 returns - 因此没有收到来自 rq 的超时信号.您可以允许主线程 运行 并通过使用 Thread.join
和设置的等待时间正确接收超时信号,同时等待线程完成 运行ning。像这样:
def hello_task(self):
t = Thread(target=self.say_hello)
t.start()
while t.isAlive():
t.join(1) # Block for 1 second
这样你也可以捕获超时异常并处理它,如果你愿意的话:
def hello_task(self):
t = Thread(target=self.say_hello)
t.start()
try:
while t.isAlive():
t.join(1) # Block for 1 second
except JobTimeoutException: # From rq.timeouts.JobTimeoutException
print "Thread killed due to timeout"
raise
我在使用 python RQ(在 v0.5.6 和 v0.6.0 上测试)运行宁多线程任务时遇到问题。
考虑以下代码,作为我要实现的目标的简化版本:
thing.py
from threading import Thread
class MyThing(object):
def say_hello(self):
while True:
print "Hello World"
def hello_task(self):
t = Thread(target=self.say_hello)
t.daemon = True # seems like it makes no difference
t.start()
t.join()
main.py
from rq import Queue
from redis import Redis
from thing import MyThing
conn = Redis()
q = Queue(connection=conn)
q.enqueue(MyThing().say_hello, timeout=5)
当执行 main.py
时(当 rqworker 在后台 运行ning 时),作业按预期在 5 秒内超时中断。
问题是,当我设置一个包含 thread/s 的任务(例如 MyThing().hello_task
)时,线程永远 运行 并且在 5 秒超时结束时没有任何反应。
我如何运行一个带有RQ的多线程任务,这样超时就会杀死任务,它的儿子,孙子和他们的妻子?
当你 运行 t.join()
时,hello_task
线程阻塞并等待 say_hello
线程 returns - 因此没有收到来自 rq 的超时信号.您可以允许主线程 运行 并通过使用 Thread.join
和设置的等待时间正确接收超时信号,同时等待线程完成 运行ning。像这样:
def hello_task(self):
t = Thread(target=self.say_hello)
t.start()
while t.isAlive():
t.join(1) # Block for 1 second
这样你也可以捕获超时异常并处理它,如果你愿意的话:
def hello_task(self):
t = Thread(target=self.say_hello)
t.start()
try:
while t.isAlive():
t.join(1) # Block for 1 second
except JobTimeoutException: # From rq.timeouts.JobTimeoutException
print "Thread killed due to timeout"
raise