Scheduling/Queueing 另一个作业中的作业与 redis 队列
Scheduling/Queueing a job within another job with redis queue
Tasker
class 在实例化时设置初始作业。基本上我想要的是在 'main_queue' 中放置一个作业,确定作业是否为 运行 或者是否已经有相同的作业在 'process_queue'、return 中排队当前 'main_queue' 工作。否则在 'process_queue' 中排队作业。当该进程队列完成时,将作业放入 'main_queue'.
然而,'process_queue' 在该持续时间内具有相同的 id 作业,尽管它应该已经完成查看输出。因此,永远不会处理新工作。是否发生了我看不到的死锁?
main_queue 工人
$ rq worker main_queue --with-scheduler
22:44:19 Worker rq:worker:7fe23a24ae404135a10e301f7509eb7e: started, version 1.9.0
22:44:19 Subscribing to channel rq:pubsub:7fe23a24ae404135a10e301f7509eb7e
22:44:19 *** Listening on main_queue...
22:44:19 Trying to acquire locks for main_queue
22:44:19 Scheduler for main_queue started with PID 3747
22:44:19 Cleaning registries for queue: main_queue
22:44:33 main_queue: tasks.redis_test_job() (e90e0dff-bbcc-48ab-afed-6d1ba8b020a8)
None
Job is enqueued to process_queue!
22:44:33 main_queue: Job OK (e90e0dff-bbcc-48ab-afed-6d1ba8b020a8)
22:44:33 Result is kept for 500 seconds
22:44:47 main_queue: tasks.redis_test_job() (1a7f91d0-73f4-466e-92f4-9f918a9dd1e9)
<Job test_job: tasks.print_job()>
!!Scheduler added job to main but same job is already queued in process_queue!!
22:44:47 main_queue: Job OK (1a7f91d0-73f4-466e-92f4-9f918a9dd1e9)
22:44:47 Result is kept for 500 seconds
process_queue 工人
$ rq worker process_queue
22:44:24 Worker rq:worker:d70daf20ff324c18bc17f0ea9576df52: started, version 1.9.0
22:44:24 Subscribing to channel rq:pubsub:d70daf20ff324c18bc17f0ea9576df52
22:44:24 *** Listening on process_queue...
22:44:24 Cleaning registries for queue: process_queue
22:44:33 process_queue: tasks.print_job() (test_job)
The process job executed.
22:44:42 process_queue: Job OK (test_job)
22:44:42 Result is kept for 500 seconds
tasker.py
class Tasker():
def __init__(self):
self.tasker_conn = RedisClient().conn
self.process_queue = Queue(name='process_queue', connection=Redis(),
default_timeout=-1)
self.main_queue = Queue(name='main_queue', connection=Redis(),
default_timeout=-1)
self.__setup_tasks()
def __setup_tasks(self):
self.main_queue.enqueue_in(timedelta(seconds=3), tasks.redis_test_job)
tasks.py
import tasks
def redis_test_job():
q = Queue('process_queue', connection=Redis(), default_timeout=-1)
queued = q.fetch_job('test_job')
print(queued)
if queued:
print("!!Scheduler added job to main but same job is already queued in process_queue!!")
return False
else:
q.enqueue(tasks.print_job, job_id='test_job')
print("Job is enqueued to process_queue!")
return True
def print_job():
sleep(8)
print("The process job executed.")
q = Queue('main_queue', connection=Redis(), default_timeout=-1)
q.enqueue_in(timedelta(seconds=5), tasks.redis_test_job)
来自the docs, enqueued jobs have a result_ttl
that defaults to 500 seconds如果你不定义它。
如果您想将其更改为例如使工作和结果只存在 1 秒,像这样排队你的工作:
q.enqueue(tasks.print_job, job_id='test_job', result_ttl=1)
Tasker
class 在实例化时设置初始作业。基本上我想要的是在 'main_queue' 中放置一个作业,确定作业是否为 运行 或者是否已经有相同的作业在 'process_queue'、return 中排队当前 'main_queue' 工作。否则在 'process_queue' 中排队作业。当该进程队列完成时,将作业放入 'main_queue'.
然而,'process_queue' 在该持续时间内具有相同的 id 作业,尽管它应该已经完成查看输出。因此,永远不会处理新工作。是否发生了我看不到的死锁?
main_queue 工人
$ rq worker main_queue --with-scheduler
22:44:19 Worker rq:worker:7fe23a24ae404135a10e301f7509eb7e: started, version 1.9.0
22:44:19 Subscribing to channel rq:pubsub:7fe23a24ae404135a10e301f7509eb7e
22:44:19 *** Listening on main_queue...
22:44:19 Trying to acquire locks for main_queue
22:44:19 Scheduler for main_queue started with PID 3747
22:44:19 Cleaning registries for queue: main_queue
22:44:33 main_queue: tasks.redis_test_job() (e90e0dff-bbcc-48ab-afed-6d1ba8b020a8)
None
Job is enqueued to process_queue!
22:44:33 main_queue: Job OK (e90e0dff-bbcc-48ab-afed-6d1ba8b020a8)
22:44:33 Result is kept for 500 seconds
22:44:47 main_queue: tasks.redis_test_job() (1a7f91d0-73f4-466e-92f4-9f918a9dd1e9)
<Job test_job: tasks.print_job()>
!!Scheduler added job to main but same job is already queued in process_queue!!
22:44:47 main_queue: Job OK (1a7f91d0-73f4-466e-92f4-9f918a9dd1e9)
22:44:47 Result is kept for 500 seconds
process_queue 工人
$ rq worker process_queue
22:44:24 Worker rq:worker:d70daf20ff324c18bc17f0ea9576df52: started, version 1.9.0
22:44:24 Subscribing to channel rq:pubsub:d70daf20ff324c18bc17f0ea9576df52
22:44:24 *** Listening on process_queue...
22:44:24 Cleaning registries for queue: process_queue
22:44:33 process_queue: tasks.print_job() (test_job)
The process job executed.
22:44:42 process_queue: Job OK (test_job)
22:44:42 Result is kept for 500 seconds
tasker.py
class Tasker():
def __init__(self):
self.tasker_conn = RedisClient().conn
self.process_queue = Queue(name='process_queue', connection=Redis(),
default_timeout=-1)
self.main_queue = Queue(name='main_queue', connection=Redis(),
default_timeout=-1)
self.__setup_tasks()
def __setup_tasks(self):
self.main_queue.enqueue_in(timedelta(seconds=3), tasks.redis_test_job)
tasks.py
import tasks
def redis_test_job():
q = Queue('process_queue', connection=Redis(), default_timeout=-1)
queued = q.fetch_job('test_job')
print(queued)
if queued:
print("!!Scheduler added job to main but same job is already queued in process_queue!!")
return False
else:
q.enqueue(tasks.print_job, job_id='test_job')
print("Job is enqueued to process_queue!")
return True
def print_job():
sleep(8)
print("The process job executed.")
q = Queue('main_queue', connection=Redis(), default_timeout=-1)
q.enqueue_in(timedelta(seconds=5), tasks.redis_test_job)
来自the docs, enqueued jobs have a result_ttl
that defaults to 500 seconds如果你不定义它。
如果您想将其更改为例如使工作和结果只存在 1 秒,像这样排队你的工作:
q.enqueue(tasks.print_job, job_id='test_job', result_ttl=1)