如何在Python-RQ中创建多个worker?

How to create multiple workers in Python-RQ?

我们最近被迫用 RQ 替换芹菜,因为它更简单,而且芹菜给我们带来了太多问题。现在,我们无法找到动态创建多个队列的方法,因为我们需要同时完成多个作业。所以基本上每个对我们其中一条路线的请求都应该开始一项工作,让多个用户等待一个用户的工作完成然后我们才能继续下一个工作是没有意义的。我们定期向服务器发送请求以获取作业的状态和一些元数据。这样我们就可以用进度条更新用户(这可能是一个漫长的过程,所以为了用户体验必须这样做)

我们正在使用 Django 和 Python 的 rq library. We are not using django-rq(如果使用它有优势,请告诉我)

到目前为止,我们在其中一个控制器中启动了一项任务,例如:

redis_conn = Redis()
q = Queue(connection=redis_conn)  
job = django_rq.enqueue(render_task, new_render.pk, domain=domain, data=csv_data, timeout=1200)

然后在我们的render_task方法中,我们根据长任务的状态向作业添加元数据:

current_job = get_current_job()
current_job.meta['state'] = 'PROGRESS'
current_job.meta['process_percent'] = process_percent
current_job.meta['message'] = 'YOUTUBE'
current_job.save()

现在我们有另一个端点获取当前任务及其元数据并将其传递回客户端(这是通过偶发 AJAX 请求发生的)

我们如何在不阻塞其他作业的情况下同时处理 运行 个作业?我们应该动态地创建队列吗?有没有办法利用 Worker 来实现这一目标?

据我所知,RQ 没有任何设施来管理多个工人。您必须启动一个新的工作进程来定义它将使用哪个队列。一种对我来说效果很好的方法是使用 Supervisor。在 supervisor 中,您可以为给定的队列和进程数配置您的 worker 以具有并发性。例如,您可以让队列 "high-priority" 有 5 个工人,队列 "low-priority" 有 1 个工人。

我想建议一个使用 django-rq 的非常简单的解决方案:

样本settings.py

...

RQ_QUEUES = {
    'default': {
        'HOST': os.getenv('REDIS_HOST', 'localhost'),
        'PORT': 6379,
        'DB': 0,
        'DEFAULT_TIMEOUT': 360,
    },
    'low': {
        'HOST': os.getenv('REDIS_HOST', 'localhost'),
        'PORT': 6379,
        'DB': 0,
        'DEFAULT_TIMEOUT': 360,
    }
}

...

运行配置

运行 python manage.py rqworker default low 次(例如,每次在它自己的 shell 或作为它自己的 Docker 容器)所需工人的数量.命令中队列的顺序决定了它们的优先级。此时,所有工作人员都在监听两个队列。

代码中

调用作业到运行时,传入需要的队列:

对于high/normal个优先级作业,可以不带任何参数调用,作业会进入默认队列。对于低优先级,您必须在作业级别指定:

@job('low')
def my_low_priority_job():
  # some code

然后调用my_low_priority_job.delay().

或者,在调用时确定优先级:

queue = django_rq.get_queue('low')
queue.enqueue(my_variable_priority_job)

对于 运行 多个工作人员来说,这不仅是可能的,而且是理想的。我使用 bash 文件作为启动命令进入虚拟环境,并使用自定义 Worker class 启动。

这是一个主管配置,在生产工作负载下对 RQ 工作人员来说效果很好。请注意,startretries 很高,因为这 运行s 在 AWS 上并且需要在部署期间重试。

[program:rq-workers]
process_name=%(program_name)s_%(process_num)02d
command=/usr/local/bin/start_rq_worker.sh
autostart=true
autorestart=true
user=root
numprocs=5
startretries=50
stopsignal=INT
killasgroup=true
stopasgroup=true
stdout_logfile=/opt/elasticbeanstalk/tasks/taillogs.d/super_logs.conf
redirect_stderr=true

start_rq_worker.sh

的内容
#!/bin/bash
date > /tmp/date
source /opt/python/run/venv/bin/activate
source /opt/python/current/env
/opt/python/run/venv/bin/python /opt/python/current/app/manage.py
rqworker --worker-class rq.SimpleWorker default