优雅地等待 redis 队列中的作业完成,而不用忙等待?
Elegantly wait until a job in redis queue is done, without busy wait?
我正在尝试在当前系统中实施 redis queue。 job
将被发送到另一个模块,它应该等到作业完成并返回结果 job.result
,然后继续:
with Connection(redis_connection):
job = job_queue.enqueue(worker_func, func_input1, func_input2)
print("waiting for result")
print(datetime.datetime.now())
while job.result is None:
pass
print(datetime.datetime.now())
print("got result")
# next step
next_step_func(job.result)
...
我在这里面临 2 个问题:
- 忙等,
while job.result is None
等了很久。我在 worker_func
中的处理时间大约为 2-3 秒,这涉及到在另一台服务器上调用 API,但繁忙的等待 while job.result is None
本身又花费了 >= 3 秒,加起来总共 >= 5 秒。我确信等待发生在 while job.result is None
执行之后,因为我为 worker_func
和 while job.result is None
添加了日志:
2021-07-12T18:57:59.09+0800 [APP/PROC/WEB/0] OUT start work
2021-07-12T18:57:59.09+0800 [APP/PROC/WEB/0] OUT 2021-07-12 10:57:57.601189
2021-07-12T18:57:59.09+0800 [APP/PROC/WEB/0] OUT 2021-07-12 10:57:59.075137
2021-07-12T18:57:59.09+0800 [APP/PROC/WEB/0] OUT end work
...
2021-07-12T18:57:59.09+0800 [APP/PROC/WEB/0] OUT waiting for result
2021-07-12T18:57:59.09+0800 [APP/PROC/WEB/0] OUT 2021-07-12 10:57:53.704891
2021-07-12T18:57:59.09+0800 [APP/PROC/WEB/0] OUT 2021-07-12 10:57:59.096009
正如您在上面看到的,繁忙的等待循环发生在 worker_func
完成之后。
2,有没有其他优雅的方式来实现这里的同步等待而不是忙循环?我认为这里的繁忙循环绝对不是最好的实现,因为它会消耗大量 CPU 资源。
谢谢!
-- 编辑上面的代码以提供更清晰的上下文
我需要从调用 job_queue.enqueue
的地方返回 next_step_func(job.result)
的值。所以更清晰的结构是:
def endpoint():
with Connection(redis_connection):
job = job_queue.enqueue(worker_func, func_input1, func_input2)
print("waiting for result")
print(datetime.datetime.now())
while job.result is None:
pass
print(datetime.datetime.now())
print("got result")
# next step
return next_step_func(job.result)
...
所以痛点是我需要 job.result
能够在 endpoint()
中返回,但作业回调会将我的作业带到 on_success
的不同上下文。
文档建议使用 job callbacks 作为选项:
def job_succeeded(job, connection, result, *args, **kwargs):
next_step_func(job.result)
def job_failed(job, connection, type, value, traceback):
# react to the error
pass
with Connection(redis_connection):
args = (func_input1, func_input2)
job_queue.enqueue(worker_func, args=args, on_success=job_succeeded, on_failure=job_failed)
我正在尝试在当前系统中实施 redis queue。 job
将被发送到另一个模块,它应该等到作业完成并返回结果 job.result
,然后继续:
with Connection(redis_connection):
job = job_queue.enqueue(worker_func, func_input1, func_input2)
print("waiting for result")
print(datetime.datetime.now())
while job.result is None:
pass
print(datetime.datetime.now())
print("got result")
# next step
next_step_func(job.result)
...
我在这里面临 2 个问题:
- 忙等,
while job.result is None
等了很久。我在worker_func
中的处理时间大约为 2-3 秒,这涉及到在另一台服务器上调用 API,但繁忙的等待while job.result is None
本身又花费了 >= 3 秒,加起来总共 >= 5 秒。我确信等待发生在while job.result is None
执行之后,因为我为worker_func
和while job.result is None
添加了日志:
2021-07-12T18:57:59.09+0800 [APP/PROC/WEB/0] OUT start work
2021-07-12T18:57:59.09+0800 [APP/PROC/WEB/0] OUT 2021-07-12 10:57:57.601189
2021-07-12T18:57:59.09+0800 [APP/PROC/WEB/0] OUT 2021-07-12 10:57:59.075137
2021-07-12T18:57:59.09+0800 [APP/PROC/WEB/0] OUT end work
...
2021-07-12T18:57:59.09+0800 [APP/PROC/WEB/0] OUT waiting for result
2021-07-12T18:57:59.09+0800 [APP/PROC/WEB/0] OUT 2021-07-12 10:57:53.704891
2021-07-12T18:57:59.09+0800 [APP/PROC/WEB/0] OUT 2021-07-12 10:57:59.096009
正如您在上面看到的,繁忙的等待循环发生在 worker_func
完成之后。
2,有没有其他优雅的方式来实现这里的同步等待而不是忙循环?我认为这里的繁忙循环绝对不是最好的实现,因为它会消耗大量 CPU 资源。
谢谢!
-- 编辑上面的代码以提供更清晰的上下文
我需要从调用 job_queue.enqueue
的地方返回 next_step_func(job.result)
的值。所以更清晰的结构是:
def endpoint():
with Connection(redis_connection):
job = job_queue.enqueue(worker_func, func_input1, func_input2)
print("waiting for result")
print(datetime.datetime.now())
while job.result is None:
pass
print(datetime.datetime.now())
print("got result")
# next step
return next_step_func(job.result)
...
所以痛点是我需要 job.result
能够在 endpoint()
中返回,但作业回调会将我的作业带到 on_success
的不同上下文。
文档建议使用 job callbacks 作为选项:
def job_succeeded(job, connection, result, *args, **kwargs):
next_step_func(job.result)
def job_failed(job, connection, type, value, traceback):
# react to the error
pass
with Connection(redis_connection):
args = (func_input1, func_input2)
job_queue.enqueue(worker_func, args=args, on_success=job_succeeded, on_failure=job_failed)