python rq - 如何在多个其他作业完成时触发作业?多工作依赖工作?
python rq - how to trigger a job when multiple other jobs are finished? Multi job dependency work arround?
我的 python redis 队列中有一个嵌套的作业结构。首先执行 rncopy 作业。完成后,接下来是 3 个依赖注册作业。当所有这 3 个作业的计算完成后,我想触发一个作业向我的前端发送一个 websocket 通知。
我目前的尝试:
rncopy = redisqueue.enqueue(raw_nifti_copymachine, patientid, imagepath, timeout=6000)
t1c_reg = redisqueue.enqueue(modality_registrator, patientid, "t1c", timeout=6000, depends_on=rncopy)
t2_reg = redisqueue.enqueue(modality_registrator, patientid, "t2", timeout=6000, depends_on=rncopy)
fla_reg = redisqueue.enqueue(modality_registrator, patientid, "fla", timeout=6000, depends_on=rncopy)
notify = redisqueue.enqueue(print, patient_finished, patientid, timeout=6000, depends_on=(t1c_reg, t2_reg, fla_reg))
不幸的是,多作业依赖功能似乎从未合并到 master 中。我看到 git 目前有两个拉取请求。有我可以使用的解决方法吗?
抱歉未能提供可重现的示例。
新版本(RQ >= 1.8)
您可以简单地使用 depends_on
参数,传递列表或元组。
rncopy = redisqueue.enqueue(raw_nifti_copymachine, patientid, imagepath, timeout=6000)
t1c_reg = redisqueue.enqueue(modality_registrator, patientid, "t1c", timeout=6000, depends_on=rncopy)
t2_reg = redisqueue.enqueue(modality_registrator, patientid, "t2", timeout=6000, depends_on=rncopy)
fla_reg = redisqueue.enqueue(modality_registrator, patientid, "fla", timeout=6000, depends_on=rncopy)
notify = redisqueue.enqueue(first_wrapper, patient_finished, patientid,t2_reg.id,fla_reg.id, timeout=6000, depends_on=(t1c_reg, t2_reg, fla_reg))
# you can also use a list instead of a tuple:
# notify = redisqueue.enqueue(first_wrapper, patient_finished, patientid,t2_reg.id,fla_reg.id, timeout=6000, depends_on=[t1c_reg, t2_reg, fla_reg])
旧版本(RQ < 1.8)
我使用这个解决方法:如果依赖项是 n,我创建 n-1 真实函数的包装器:每个包装器都取决于在不同的工作。
这个解决方案有点复杂,但它有效。
rncopy = redisqueue.enqueue(raw_nifti_copymachine, patientid, imagepath, timeout=6000)
t1c_reg = redisqueue.enqueue(modality_registrator, patientid, "t1c", timeout=6000, depends_on=rncopy)
t2_reg = redisqueue.enqueue(modality_registrator, patientid, "t2", timeout=6000, depends_on=rncopy)
fla_reg = redisqueue.enqueue(modality_registrator, patientid, "fla", timeout=6000, depends_on=rncopy)
notify = redisqueue.enqueue(first_wrapper, patient_finished, patientid,t2_reg.id,fla_reg.id, timeout=6000, depends_on=t1c_reg)
def first_wrapper(patient_finished, patientid,t2_reg_id,fla_reg_id):
queue = Queue('YOUR-QUEUE-NAME'))
queue.enqueue(second_wrapper, patient_finished, patientid, fla_reg_id, timeout=6000, depends_on=t2_reg_id)
def second_wrapper(patient_finished, patientid,fla_reg_id):
queue = Queue('YOUR-QUEUE-NAME'))
queue.enqueue(print, patient_finished, patientid, timeout=6000, depends_on=fla_reg_id)
一些注意事项:
我没有将队列对象传递给包装器,因为会出现一些序列化问题;因此,必须按名称恢复队列...
出于同样的原因,我将 job.id(而不是作业对象)传递给包装器。
我创建了一个“rq-manager”来解决与多个和树状依赖关系类似的问题:
https://github.com/crispyDyne/rq-manager
具有多个依赖项的项目结构如下所示。
def simpleTask(x):
return 2*x
project = {'jobs':[
{
'blocking':True, # this job must finished before moving on.
'func':simpleTask,'args': 0
},
{
'blocking':True, # this job, and its child jobs, must finished before moving on.
'jobs':[ # these child jobs will run in parallel
{'func':simpleTask,'args': 1},
{'func':simpleTask,'args': 2},
{'func':simpleTask,'args': 3}],
},
{ # this job will only run when the blocking jobs above finish.
'func':simpleTask,'args': 4
}
]}
然后交给经理完成。
from rq_manager import manager, getProjectResults
managerJob = q.enqueue(manager,project)
projectResults = getProjectResults(managerJob)
returns
projectResults = [0, [2, 4, 6], 8]
当从属作业需要来自父级的结果时。我创建了一个函数来执行第一个作业,然后将其他作业添加到项目中。所以对于你的例子:
def firstTask(patientid,imagepath):
raw_nifti_result = raw_nifti_copymachine(patientid,imagepath)
moreTasks = {'jobs':[
{'func':modality_registrator,'args':(patientid, "t1c", raw_nifti_result)},
{'func':modality_registrator,'args':(patientid, "t2", raw_nifti_result)},
{'func':modality_registrator,'args':(patientid, "fla", raw_nifti_result)},
]}
# returning a dictionary with an "addJobs" will add those tasks to the project.
return {'result':raw_nifti_result, 'addJobs':moreTasks}
项目看起来像这样:
project = {'jobs':[
{'blocking':True, # this job, and its child jobs, must finished before moving on.
'jobs':[
{
'func':firstTask, 'args':(patientid, imagepath)
'blocking':True, # this job must finished before moving on.
},
# "moreTasks" will be added here
]
}
{ # this job will only run when the blocking jobs above finish.
'func':print,'args': (patient_finished, patientid)
}
]}
如果最终作业需要先前作业的结果,则设置“previousJobArgs”标志。 “finalJob”将接收到先前结果的数组及其子作业结果的嵌套数组。
def finalJob(previousResults):
# previousResults = [
# raw_nifti_copymachine(patientid,imagepath),
# [
# modality_registrator(patientid, "t1c", raw_nifti_result),
# modality_registrator(patientid, "t2", raw_nifti_result),
# modality_registrator(patientid, "fla", raw_nifti_result),
# ]
# ]
return doSomethingWith(previousResults)
那么项目看起来像这样
project = {'jobs':[
{
#'blocking':True, # Blocking not needed.
'jobs':[
{
'func':firstTask, 'args':(patientid, imagepath)
'blocking':True, # this job must finished before moving on.
},
# "moreTasks" will be added here
]
}
{ # This job will wait, since it needs the previous job's results.
'func':finalJob, 'previousJobArgs': True # it gets all the previous jobs results
}
]}
希望 https://github.com/rq/rq/issues/260 得到实施,我的解决方案将会过时!
我的 python redis 队列中有一个嵌套的作业结构。首先执行 rncopy 作业。完成后,接下来是 3 个依赖注册作业。当所有这 3 个作业的计算完成后,我想触发一个作业向我的前端发送一个 websocket 通知。
我目前的尝试:
rncopy = redisqueue.enqueue(raw_nifti_copymachine, patientid, imagepath, timeout=6000)
t1c_reg = redisqueue.enqueue(modality_registrator, patientid, "t1c", timeout=6000, depends_on=rncopy)
t2_reg = redisqueue.enqueue(modality_registrator, patientid, "t2", timeout=6000, depends_on=rncopy)
fla_reg = redisqueue.enqueue(modality_registrator, patientid, "fla", timeout=6000, depends_on=rncopy)
notify = redisqueue.enqueue(print, patient_finished, patientid, timeout=6000, depends_on=(t1c_reg, t2_reg, fla_reg))
不幸的是,多作业依赖功能似乎从未合并到 master 中。我看到 git 目前有两个拉取请求。有我可以使用的解决方法吗?
抱歉未能提供可重现的示例。
新版本(RQ >= 1.8)
您可以简单地使用 depends_on
参数,传递列表或元组。
rncopy = redisqueue.enqueue(raw_nifti_copymachine, patientid, imagepath, timeout=6000)
t1c_reg = redisqueue.enqueue(modality_registrator, patientid, "t1c", timeout=6000, depends_on=rncopy)
t2_reg = redisqueue.enqueue(modality_registrator, patientid, "t2", timeout=6000, depends_on=rncopy)
fla_reg = redisqueue.enqueue(modality_registrator, patientid, "fla", timeout=6000, depends_on=rncopy)
notify = redisqueue.enqueue(first_wrapper, patient_finished, patientid,t2_reg.id,fla_reg.id, timeout=6000, depends_on=(t1c_reg, t2_reg, fla_reg))
# you can also use a list instead of a tuple:
# notify = redisqueue.enqueue(first_wrapper, patient_finished, patientid,t2_reg.id,fla_reg.id, timeout=6000, depends_on=[t1c_reg, t2_reg, fla_reg])
旧版本(RQ < 1.8)
我使用这个解决方法:如果依赖项是 n,我创建 n-1 真实函数的包装器:每个包装器都取决于在不同的工作。
这个解决方案有点复杂,但它有效。
rncopy = redisqueue.enqueue(raw_nifti_copymachine, patientid, imagepath, timeout=6000)
t1c_reg = redisqueue.enqueue(modality_registrator, patientid, "t1c", timeout=6000, depends_on=rncopy)
t2_reg = redisqueue.enqueue(modality_registrator, patientid, "t2", timeout=6000, depends_on=rncopy)
fla_reg = redisqueue.enqueue(modality_registrator, patientid, "fla", timeout=6000, depends_on=rncopy)
notify = redisqueue.enqueue(first_wrapper, patient_finished, patientid,t2_reg.id,fla_reg.id, timeout=6000, depends_on=t1c_reg)
def first_wrapper(patient_finished, patientid,t2_reg_id,fla_reg_id):
queue = Queue('YOUR-QUEUE-NAME'))
queue.enqueue(second_wrapper, patient_finished, patientid, fla_reg_id, timeout=6000, depends_on=t2_reg_id)
def second_wrapper(patient_finished, patientid,fla_reg_id):
queue = Queue('YOUR-QUEUE-NAME'))
queue.enqueue(print, patient_finished, patientid, timeout=6000, depends_on=fla_reg_id)
一些注意事项:
我没有将队列对象传递给包装器,因为会出现一些序列化问题;因此,必须按名称恢复队列...
出于同样的原因,我将 job.id(而不是作业对象)传递给包装器。
我创建了一个“rq-manager”来解决与多个和树状依赖关系类似的问题: https://github.com/crispyDyne/rq-manager
具有多个依赖项的项目结构如下所示。
def simpleTask(x):
return 2*x
project = {'jobs':[
{
'blocking':True, # this job must finished before moving on.
'func':simpleTask,'args': 0
},
{
'blocking':True, # this job, and its child jobs, must finished before moving on.
'jobs':[ # these child jobs will run in parallel
{'func':simpleTask,'args': 1},
{'func':simpleTask,'args': 2},
{'func':simpleTask,'args': 3}],
},
{ # this job will only run when the blocking jobs above finish.
'func':simpleTask,'args': 4
}
]}
然后交给经理完成。
from rq_manager import manager, getProjectResults
managerJob = q.enqueue(manager,project)
projectResults = getProjectResults(managerJob)
returns
projectResults = [0, [2, 4, 6], 8]
当从属作业需要来自父级的结果时。我创建了一个函数来执行第一个作业,然后将其他作业添加到项目中。所以对于你的例子:
def firstTask(patientid,imagepath):
raw_nifti_result = raw_nifti_copymachine(patientid,imagepath)
moreTasks = {'jobs':[
{'func':modality_registrator,'args':(patientid, "t1c", raw_nifti_result)},
{'func':modality_registrator,'args':(patientid, "t2", raw_nifti_result)},
{'func':modality_registrator,'args':(patientid, "fla", raw_nifti_result)},
]}
# returning a dictionary with an "addJobs" will add those tasks to the project.
return {'result':raw_nifti_result, 'addJobs':moreTasks}
项目看起来像这样:
project = {'jobs':[
{'blocking':True, # this job, and its child jobs, must finished before moving on.
'jobs':[
{
'func':firstTask, 'args':(patientid, imagepath)
'blocking':True, # this job must finished before moving on.
},
# "moreTasks" will be added here
]
}
{ # this job will only run when the blocking jobs above finish.
'func':print,'args': (patient_finished, patientid)
}
]}
如果最终作业需要先前作业的结果,则设置“previousJobArgs”标志。 “finalJob”将接收到先前结果的数组及其子作业结果的嵌套数组。
def finalJob(previousResults):
# previousResults = [
# raw_nifti_copymachine(patientid,imagepath),
# [
# modality_registrator(patientid, "t1c", raw_nifti_result),
# modality_registrator(patientid, "t2", raw_nifti_result),
# modality_registrator(patientid, "fla", raw_nifti_result),
# ]
# ]
return doSomethingWith(previousResults)
那么项目看起来像这样
project = {'jobs':[
{
#'blocking':True, # Blocking not needed.
'jobs':[
{
'func':firstTask, 'args':(patientid, imagepath)
'blocking':True, # this job must finished before moving on.
},
# "moreTasks" will be added here
]
}
{ # This job will wait, since it needs the previous job's results.
'func':finalJob, 'previousJobArgs': True # it gets all the previous jobs results
}
]}
希望 https://github.com/rq/rq/issues/260 得到实施,我的解决方案将会过时!