RQ 超时不会终止多线程作业

RQ Timeout does not kill multi-threaded jobs

我在使用 python RQ(在 v0.5.6 和 v0.6.0 上测试)运行宁多线程任务时遇到问题。

考虑以下代码,作为我要实现的目标的简化版本:

thing.py

from threading import Thread

class MyThing(object):
    def say_hello(self):
        while True:
            print "Hello World"

    def hello_task(self):
        t = Thread(target=self.say_hello)
        t.daemon = True # seems like it makes no difference
        t.start()
        t.join()

main.py

from rq import Queue
from redis import Redis
from thing import MyThing

conn = Redis()

q = Queue(connection=conn)

q.enqueue(MyThing().say_hello, timeout=5)

当执行 main.py 时(当 rqworker 在后台 运行ning 时),作业按预期在 5 秒内超时中断。

问题是,当我设置一个包含 thread/s 的任务(例如 MyThing().hello_task)时,线程永远 运行 并且在 5 秒超时结束时没有任何反应。

我如何运行一个带有RQ的多线程任务,这样超时就会杀死任务,它的儿子,孙子和他们的妻子?

当你 运行 t.join() 时,hello_task 线程阻塞并等待 say_hello 线程 returns - 因此没有收到来自 rq 的超时信号.您可以允许主线程 运行 并通过使用 Thread.join 和设置的等待时间正确接收超时信号,同时等待线程完成 运行ning。像这样:

def hello_task(self):
    t = Thread(target=self.say_hello)
    t.start()
    while t.isAlive():
        t.join(1)  # Block for 1 second

这样你也可以捕获超时异常并处理它,如果你愿意的话:

def hello_task(self):
    t = Thread(target=self.say_hello)
    t.start()
    try:
        while t.isAlive():
            t.join(1)  # Block for 1 second
    except JobTimeoutException:  # From rq.timeouts.JobTimeoutException
        print "Thread killed due to timeout"
        raise