优雅地等待 redis 队列中的作业完成,而不用忙等待?

Elegantly wait until a job in redis queue is done, without busy wait?

我正在尝试在当前系统中实施 redis queuejob 将被发送到另一个模块,它应该等到作业完成并返回结果 job.result,然后继续:

with Connection(redis_connection):
    job = job_queue.enqueue(worker_func, func_input1, func_input2)

print("waiting for result")
print(datetime.datetime.now())
while job.result is None:
    pass
print(datetime.datetime.now())
print("got result")

# next step
next_step_func(job.result)

...

我在这里面临 2 个问题:

  1. 忙等,while job.result is None等了很久。我在 worker_func 中的处理时间大约为 2-3 秒,这涉及到在另一台服务器上调用 API,但繁忙的等待 while job.result is None 本身又花费了 >= 3 秒,加起来总共 >= 5 秒。我确信等待发生在 while job.result is None 执行之后,因为我为 worker_funcwhile job.result is None 添加了日志:
   2021-07-12T18:57:59.09+0800 [APP/PROC/WEB/0] OUT start work
   2021-07-12T18:57:59.09+0800 [APP/PROC/WEB/0] OUT 2021-07-12 10:57:57.601189
   2021-07-12T18:57:59.09+0800 [APP/PROC/WEB/0] OUT 2021-07-12 10:57:59.075137
   2021-07-12T18:57:59.09+0800 [APP/PROC/WEB/0] OUT end work
...
   2021-07-12T18:57:59.09+0800 [APP/PROC/WEB/0] OUT waiting for result
   2021-07-12T18:57:59.09+0800 [APP/PROC/WEB/0] OUT 2021-07-12 10:57:53.704891
   2021-07-12T18:57:59.09+0800 [APP/PROC/WEB/0] OUT 2021-07-12 10:57:59.096009

正如您在上面看到的,繁忙的等待循环发生在 worker_func 完成之后。

2,有没有其他优雅的方式来实现这里的同步等待而不是忙循环?我认为这里的繁忙循环绝对不是最好的实现,因为它会消耗大量 CPU 资源。

谢谢!

-- 编辑上面的代码以提供更清晰的上下文

我需要从调用 job_queue.enqueue 的地方返回 next_step_func(job.result) 的值。所以更清晰的结构是:

def endpoint():
    with Connection(redis_connection):
        job = job_queue.enqueue(worker_func, func_input1, func_input2)

    print("waiting for result")
    print(datetime.datetime.now())
    while job.result is None:
        pass
    print(datetime.datetime.now())
    print("got result")

    # next step
    return next_step_func(job.result)

...

所以痛点是我需要 job.result 能够在 endpoint() 中返回,但作业回调会将我的作业带到 on_success 的不同上下文。

文档建议使用 job callbacks 作为选项:

def job_succeeded(job, connection, result, *args, **kwargs):
    next_step_func(job.result)

def job_failed(job, connection, type, value, traceback):
    # react to the error
    pass

with Connection(redis_connection):
    args = (func_input1, func_input2)
    job_queue.enqueue(worker_func, args=args, on_success=job_succeeded, on_failure=job_failed)