如何在 python-rq 中创建计划作业和排队作业之间的 ``depends_on`` 关系
How to create a ``depends_on`` relationship between scheduled and queued jobs in python-rq
我有一个 Web 服务(Python 3.7,Flask 1.0.2),其工作流程包括 3 个步骤:
- 第 1 步:将远程计算作业提交到商业排队系统(IBM 的 LSF)
- 第 2 步:每 61 秒轮询一次远程计算作业状态(61 秒是因为缓存的作业状态结果)
- 步骤 3:数据 post-如果步骤 2 returns 远程计算作业状态 == "DONE"
的处理
远程计算作业的长度是任意的(秒到天之间),每一步都取决于前一步的完成:
with Connection(redis.from_url(current_app.config['REDIS_URL'])):
q = Queue()
job1 = q.enqueue(step1)
job2 = q.enqueue(step2, depends_on=job1)
job3 = q.enqueue(step3, depends_on=job2)
但是,最终所有工作人员(4 个工作人员)都将进行轮询(4 个客户端请求中的第 2 步),同时他们应该继续执行其他传入请求的第 1 步和已成功通过第 2 步的工作流的第 3 步。
每次投票后应释放工人。他们应该定期返回步骤 2 进行下一次轮询(每个作业最多每 61 秒一次),如果远程计算作业轮询没有 return "DONE" 重新排队轮询作业。
此时我开始使用rq-scheduler
(因为间隔和重新排队功能听起来很有希望):
with Connection(redis.from_url(current_app.config['REDIS_URL'])):
q = Queue()
s = Scheduler('default')
job1 = q.enqueue(step1, REQ_ID)
job2 = Job.create(step2, (REQ_ID,), depends_on=job1)
job2.meta['interval'] = 61
job2.origin = 'default'
job2.save()
s.enqueue_job(job2)
job3 = q.enqueue(step3, REQ_ID, depends_on=job2)
Job2 被正确创建(包括与 job1 的 depends_on
关系,但 s.enqueue_job() 直接执行它,忽略它与 job1 的关系。([=56= 的函数文档字符串]() 其实说的是立即执行...).
当 job2 被放入调度程序而不是队列时,如何在 job1、job2 和 job3 之间创建 depends_on
关系? (或者,我如何将 job2 交给调度程序,而不是立即执行 job2 并等待 job1 完成?)
出于测试目的,步骤如下所示:
def step1():
print(f'*** --> [{datetime.utcnow()}] JOB [ 1 ] STARTED...', flush=True)
time.sleep(20)
print(f' <-- [{datetime.utcnow()}] JOB [ 1 ] FINISHED', flush=True)
return True
def step2():
print(f' --> [{datetime.utcnow()}] POLL JOB [ 2 ] STARTED...', flush=True)
time.sleep(10)
print(f' <-- [{datetime.utcnow()}] POLL JOB [ 2 ] FINISHED', flush=True)
return True
def step3():
print(f' --> [{datetime.utcnow()}] JOB [ 3 ] STARTED...', flush=True)
time.sleep(10)
print(f'*** <-- [{datetime.utcnow()}] JOB [ 3 ] FINISHED', flush=True)
return True
我收到的输出是这样的:
worker_1 | 14:44:57 default: project.server.main.tasks.step1(1) (d40256a2-904f-4ce3-98da-6e49b5d370c9)
worker_2 | 14:44:57 default: project.server.main.tasks.step2(1) (3736909c-f05d-4160-9a76-01bb1b18db58)
worker_2 | --> [2019-11-04 14:44:57.341133] POLL JOB [ 2 ] STARTED...
worker_1 | *** --> [2019-11-04 14:44:57.342142] JOB [ 1 ] STARTED...
...
job2 没有等待 job1 完成...
#requirements.txt
Flask==1.0.2
Flask-Bootstrap==3.3.7.1
Flask-Testing==0.7.1
Flask-WTF==0.14.2
redis==3.3.11
rq==0.13
rq_scheduler==0.9.1
我对这个问题的解决方案仅使用 rq
(不再使用 rq_scheduler
):
升级到最新的python-rq包:
# requirements.txt
...
rq==1.1.0
为轮询作业创建专用队列,并相应地使作业入队(具有 depends_on
关系):
with Connection(redis.from_url(current_app.config['REDIS_URL'])):
q = Queue('default')
p = Queue('pqueue')
job1 = q.enqueue(step1)
job2 = p.enqueue(step2, depends_on=job1) # step2 enqueued in polling queue
job3 = q.enqueue(step3, depends_on=job2)
为轮询队列派生一个专用工作器。它继承自标准 Worker
class:
class PWorker(rq.worker.Worker):
def execute_job(self, *args, **kwargs):
seconds_between_polls = 65
job = args[0]
if 'lastpoll' in job.meta:
job_timedelta = (datetime.utcnow() - job.meta["lastpoll"]).total_seconds()
if job_timedelta < seconds_between_polls:
sleep_period = seconds_between_polls - job_timedelta
time.sleep(sleep_period)
job.meta['lastpoll'] = datetime.utcnow()
job.save_meta()
super().execute_job(*args, **kwargs)
PWorker 通过向作业的元数据 'lastpoll'
添加时间戳来扩展 execute_job
方法。
如果轮询作业进入,具有 lastpoll
时间戳,工作人员将检查自 lastpoll
以来的时间段是否大于 65 秒。如果是,它将当前时间写入 'lastpoll'
并执行轮询。如果不是,它会一直睡到 65s 结束,然后将当前时间写入 'lastpoll'
并执行轮询。没有 lastpoll
时间戳的工作是第一次轮询,工作人员创建时间戳并执行轮询。
创建一个专用的异常(由任务函数抛出)和一个异常处理器来处理它:
# exceptions.py
class PACError(Exception):
pass
class PACJobRun(PACError):
pass
class PACJobExit(PACError):
pass
# exception_handlers.py
def poll_exc_handler(job, exc_type, exc_value, traceback):
if exc_type is PACJobRun:
requeue_job(job.get_id(), connection=job.connection)
return False # no further exception handling
else:
return True # further exception handling
# tasks.py
def step2():
# GET request to remote compute job portal API for status
# if response == "RUN":
raise PACJobRun
return True
当自定义异常处理程序捕获到自定义异常(这意味着远程计算作业仍然是 运行)时,它会在轮询队列中重新排队作业。
将自定义异常处理程序放入异常处理层次结构中:
# manage.py
@cli.command('run_pworker')
def run_pworker():
redis_url = app.config['REDIS_URL']
redis_connection = redis.from_url(redis_url)
with rq.connections.Connection(redis_connection):
pworker = PWorker(app.config['PQUEUE'], exception_handlers=[poll_exc_handler])
pworker.work()
这个解决方案的优点在于它仅用几行额外代码就扩展了 python-rq 的标准功能。另一方面,额外的队列和工作人员增加了复杂性……
我有一个 Web 服务(Python 3.7,Flask 1.0.2),其工作流程包括 3 个步骤:
- 第 1 步:将远程计算作业提交到商业排队系统(IBM 的 LSF)
- 第 2 步:每 61 秒轮询一次远程计算作业状态(61 秒是因为缓存的作业状态结果)
- 步骤 3:数据 post-如果步骤 2 returns 远程计算作业状态 == "DONE" 的处理
远程计算作业的长度是任意的(秒到天之间),每一步都取决于前一步的完成:
with Connection(redis.from_url(current_app.config['REDIS_URL'])):
q = Queue()
job1 = q.enqueue(step1)
job2 = q.enqueue(step2, depends_on=job1)
job3 = q.enqueue(step3, depends_on=job2)
但是,最终所有工作人员(4 个工作人员)都将进行轮询(4 个客户端请求中的第 2 步),同时他们应该继续执行其他传入请求的第 1 步和已成功通过第 2 步的工作流的第 3 步。
每次投票后应释放工人。他们应该定期返回步骤 2 进行下一次轮询(每个作业最多每 61 秒一次),如果远程计算作业轮询没有 return "DONE" 重新排队轮询作业。
此时我开始使用rq-scheduler
(因为间隔和重新排队功能听起来很有希望):
with Connection(redis.from_url(current_app.config['REDIS_URL'])):
q = Queue()
s = Scheduler('default')
job1 = q.enqueue(step1, REQ_ID)
job2 = Job.create(step2, (REQ_ID,), depends_on=job1)
job2.meta['interval'] = 61
job2.origin = 'default'
job2.save()
s.enqueue_job(job2)
job3 = q.enqueue(step3, REQ_ID, depends_on=job2)
Job2 被正确创建(包括与 job1 的 depends_on
关系,但 s.enqueue_job() 直接执行它,忽略它与 job1 的关系。([=56= 的函数文档字符串]() 其实说的是立即执行...).
当 job2 被放入调度程序而不是队列时,如何在 job1、job2 和 job3 之间创建 depends_on
关系? (或者,我如何将 job2 交给调度程序,而不是立即执行 job2 并等待 job1 完成?)
出于测试目的,步骤如下所示:
def step1():
print(f'*** --> [{datetime.utcnow()}] JOB [ 1 ] STARTED...', flush=True)
time.sleep(20)
print(f' <-- [{datetime.utcnow()}] JOB [ 1 ] FINISHED', flush=True)
return True
def step2():
print(f' --> [{datetime.utcnow()}] POLL JOB [ 2 ] STARTED...', flush=True)
time.sleep(10)
print(f' <-- [{datetime.utcnow()}] POLL JOB [ 2 ] FINISHED', flush=True)
return True
def step3():
print(f' --> [{datetime.utcnow()}] JOB [ 3 ] STARTED...', flush=True)
time.sleep(10)
print(f'*** <-- [{datetime.utcnow()}] JOB [ 3 ] FINISHED', flush=True)
return True
我收到的输出是这样的:
worker_1 | 14:44:57 default: project.server.main.tasks.step1(1) (d40256a2-904f-4ce3-98da-6e49b5d370c9)
worker_2 | 14:44:57 default: project.server.main.tasks.step2(1) (3736909c-f05d-4160-9a76-01bb1b18db58)
worker_2 | --> [2019-11-04 14:44:57.341133] POLL JOB [ 2 ] STARTED...
worker_1 | *** --> [2019-11-04 14:44:57.342142] JOB [ 1 ] STARTED...
...
job2 没有等待 job1 完成...
#requirements.txt
Flask==1.0.2
Flask-Bootstrap==3.3.7.1
Flask-Testing==0.7.1
Flask-WTF==0.14.2
redis==3.3.11
rq==0.13
rq_scheduler==0.9.1
我对这个问题的解决方案仅使用 rq
(不再使用 rq_scheduler
):
升级到最新的python-rq包:
# requirements.txt ... rq==1.1.0
为轮询作业创建专用队列,并相应地使作业入队(具有
depends_on
关系):with Connection(redis.from_url(current_app.config['REDIS_URL'])): q = Queue('default') p = Queue('pqueue') job1 = q.enqueue(step1) job2 = p.enqueue(step2, depends_on=job1) # step2 enqueued in polling queue job3 = q.enqueue(step3, depends_on=job2)
为轮询队列派生一个专用工作器。它继承自标准
Worker
class:class PWorker(rq.worker.Worker): def execute_job(self, *args, **kwargs): seconds_between_polls = 65 job = args[0] if 'lastpoll' in job.meta: job_timedelta = (datetime.utcnow() - job.meta["lastpoll"]).total_seconds() if job_timedelta < seconds_between_polls: sleep_period = seconds_between_polls - job_timedelta time.sleep(sleep_period) job.meta['lastpoll'] = datetime.utcnow() job.save_meta() super().execute_job(*args, **kwargs)
PWorker 通过向作业的元数据
'lastpoll'
添加时间戳来扩展execute_job
方法。如果轮询作业进入,具有
lastpoll
时间戳,工作人员将检查自lastpoll
以来的时间段是否大于 65 秒。如果是,它将当前时间写入'lastpoll'
并执行轮询。如果不是,它会一直睡到 65s 结束,然后将当前时间写入'lastpoll'
并执行轮询。没有lastpoll
时间戳的工作是第一次轮询,工作人员创建时间戳并执行轮询。创建一个专用的异常(由任务函数抛出)和一个异常处理器来处理它:
# exceptions.py class PACError(Exception): pass class PACJobRun(PACError): pass class PACJobExit(PACError): pass
# exception_handlers.py def poll_exc_handler(job, exc_type, exc_value, traceback): if exc_type is PACJobRun: requeue_job(job.get_id(), connection=job.connection) return False # no further exception handling else: return True # further exception handling
# tasks.py def step2(): # GET request to remote compute job portal API for status # if response == "RUN": raise PACJobRun return True
当自定义异常处理程序捕获到自定义异常(这意味着远程计算作业仍然是 运行)时,它会在轮询队列中重新排队作业。
将自定义异常处理程序放入异常处理层次结构中:
# manage.py @cli.command('run_pworker') def run_pworker(): redis_url = app.config['REDIS_URL'] redis_connection = redis.from_url(redis_url) with rq.connections.Connection(redis_connection): pworker = PWorker(app.config['PQUEUE'], exception_handlers=[poll_exc_handler]) pworker.work()
这个解决方案的优点在于它仅用几行额外代码就扩展了 python-rq 的标准功能。另一方面,额外的队列和工作人员增加了复杂性……