从多进程启动芹菜工人
Starting celery worker from multiprocessing
我是芹菜新手。我见过的所有示例都是从命令行启动 celery worker。例如:
$ celery -A proj worker -l info
我正在启动一个关于 elastic beanstalk 的项目,我认为让 worker 成为我的网络应用程序的子进程会很好。我尝试使用多处理,它似乎有效。我想知道这是否是个好主意,或者是否有一些缺点。
import celery
import multiprocessing
class WorkerProcess(multiprocessing.Process):
def __init__(self):
super().__init__(name='celery_worker_process')
def run(self):
argv = [
'worker',
'--loglevel=WARNING',
'--hostname=local',
]
app.worker_main(argv)
def start_celery():
global worker_process
worker_process = WorkerProcess()
worker_process.start()
def stop_celery():
global worker_process
if worker_process:
worker_process.terminate()
worker_process = None
worker_name = 'celery@local'
worker_process = None
app = celery.Celery()
app.config_from_object('celery_app.celeryconfig')
似乎是个不错的选择,绝对不是唯一的选择,而是一个不错的选择:)
您可能想要研究的一件事(您可能已经在这样做)是将自动缩放与 Celery 队列的大小相关联。因此,您只能在队列增长时进行扩展。
实际上 Celery 在内部当然做了类似的事情,所以没有太大区别。我能想到的唯一障碍是外部资源的处理(例如数据库连接),这可能是个问题,但完全取决于您使用 Celery 做什么。
如果有人感兴趣,我确实通过预配置的 AMI 服务器 运行 Python 3.4 在 Elastic Beanstalk 上实现了此功能。我在使用基于 Docker 的服务器 运行 Debian Jessie 时遇到了很多问题。也许与端口重新映射有关。 Docker 是一种黑盒子,我发现它很难使用和调试。幸运的是,AWS 的好心人刚刚在 2015 年 4 月 8 日添加了一个非 docker Python 3.4 选项。
我进行了大量搜索以部署和运行它。我看到很多问题没有答案。所以这是我部署的非常简单的 python 3.4/flask/celery 过程。
Celery 你可以直接 pip 安装。您需要使用配置命令或 container_command 从配置文件安装 rabbitmq。我在我上传的项目 zip 中使用脚本,因此需要 container_command 才能使用该脚本(常规 eb config 命令在项目安装之前发生)。
[yourapproot]/.ebextensions/05_install_rabbitmq.config:
container_commands:
01RunScript:
command: bash ./init_scripts/app_setup.sh
[yourapproot]/init_scripts/app_setup.sh:
#!/usr/bin/env bash
# Download and install Erlang
yum install erlang
# Download the latest RabbitMQ package using wget:
wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.5.1/rabbitmq-server-3.5.1-1.noarch.rpm
# Install rabbit
rpm --import http://www.rabbitmq.com/rabbitmq-signing-key-public.asc
yum -y install rabbitmq-server-3.5.1-1.noarch.rpm
# Start server
/sbin/service rabbitmq-server start
我正在做一个烧瓶应用程序,所以我在第一个请求之前启动了工作人员:
@app.before_first_request
def before_first_request():
task_mgr.start_celery()
task_mgr 创建了 celery app 对象(我称之为 celery,因为 flask app 对象是 app)。对于简单的任务管理器,-Ofair 在这里非常关键。任务预取有各种奇怪的行为。这应该是默认设置?
task_mgr/task_mgr.py:
import celery as celery_module
import multiprocessing
class WorkerProcess(multiprocessing.Process):
def __init__(self):
super().__init__(name='celery_worker_process')
def run(self):
argv = [
'worker',
'--loglevel=WARNING',
'--hostname=local',
'-Ofair',
]
celery.worker_main(argv)
def start_celery():
global worker_process
multiprocessing.set_start_method('fork') # 'spawn' seems to work also
worker_process = WorkerProcess()
worker_process.start()
def stop_celery():
global worker_process
if worker_process:
worker_process.terminate()
worker_process = None
worker_name = 'celery@local'
worker_process = None
celery = celery_module.Celery()
celery.config_from_object('task_mgr.celery_config')
到目前为止我的配置非常简单:
task_mgr/celery_config.py:
BROKER_URL = 'amqp://'
CELERY_RESULT_BACKEND = 'amqp://'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json' # 'pickle' warning: can't use datetime in json
CELERY_RESULT_SERIALIZER = 'json' # 'pickle' warning: can't use datetime in json
CELERY_TASK_RESULT_EXPIRES = 18000 # Results hang around for 5 hours
CELERYD_CONCURRENCY = 4
然后你可以把任务放在你需要的地方:
from task_mgr.task_mgr import celery
import time
@celery.task(bind=True)
def error_task(self):
self.update_state(state='RUNNING')
time.sleep(10)
raise KeyError('im an error')
@celery.task(bind=True)
def long_task(self):
self.update_state(state='RUNNING')
time.sleep(20)
return 'long task finished'
@celery.task(bind=True)
def task_with_status(self, wait):
self.update_state(state='RUNNING')
for i in range(5):
time.sleep(wait)
self.update_state(
state='PROGRESS',
meta={
'current': i + 1,
'total': 5,
'status': 'progress',
'host': self.request.hostname,
}
)
time.sleep(wait)
return 'finished with wait = ' + str(wait)
我还保留了一个任务队列来保存异步结果,以便我可以监控任务:
task_queue = []
def queue_task(task, *args):
async_result = task.apply_async(args)
task_queue.append(
{
'task_name':task.__name__,
'task_args':args,
'async_result':async_result
}
)
return async_result
def get_tasks_info():
tasks = []
for task in task_queue:
task_name = task['task_name']
task_args = task['task_args']
async_result = task['async_result']
task_id = async_result.id
task_state = async_result.state
task_result_info = async_result.info
task_result = async_result.result
tasks.append(
{
'task_name': task_name,
'task_args': task_args,
'task_id': task_id,
'task_state': task_state,
'task_result.info': task_result_info,
'task_result': task_result,
}
)
return tasks
当然,在您需要的地方开始任务:
from webapp.app import app
from flask import url_for, render_template, redirect
from webapp import tasks
from task_mgr import task_mgr
@app.route('/start_all_tasks')
def start_all_tasks():
task_mgr.queue_task(tasks.long_task)
task_mgr.queue_task(tasks.error_task)
for i in range(1, 9):
task_mgr.queue_task(tasks.task_with_status, i * 2)
return redirect(url_for('task_status'))
@app.route('/task_status')
def task_status():
current_tasks = task_mgr.get_tasks_info()
return render_template(
'parse/task_status.html',
tasks=current_tasks
)
仅此而已。如果您需要任何帮助,请告诉我,尽管我的芹菜知识仍然相当有限。
我是芹菜新手。我见过的所有示例都是从命令行启动 celery worker。例如:
$ celery -A proj worker -l info
我正在启动一个关于 elastic beanstalk 的项目,我认为让 worker 成为我的网络应用程序的子进程会很好。我尝试使用多处理,它似乎有效。我想知道这是否是个好主意,或者是否有一些缺点。
import celery
import multiprocessing
class WorkerProcess(multiprocessing.Process):
def __init__(self):
super().__init__(name='celery_worker_process')
def run(self):
argv = [
'worker',
'--loglevel=WARNING',
'--hostname=local',
]
app.worker_main(argv)
def start_celery():
global worker_process
worker_process = WorkerProcess()
worker_process.start()
def stop_celery():
global worker_process
if worker_process:
worker_process.terminate()
worker_process = None
worker_name = 'celery@local'
worker_process = None
app = celery.Celery()
app.config_from_object('celery_app.celeryconfig')
似乎是个不错的选择,绝对不是唯一的选择,而是一个不错的选择:)
您可能想要研究的一件事(您可能已经在这样做)是将自动缩放与 Celery 队列的大小相关联。因此,您只能在队列增长时进行扩展。
实际上 Celery 在内部当然做了类似的事情,所以没有太大区别。我能想到的唯一障碍是外部资源的处理(例如数据库连接),这可能是个问题,但完全取决于您使用 Celery 做什么。
如果有人感兴趣,我确实通过预配置的 AMI 服务器 运行 Python 3.4 在 Elastic Beanstalk 上实现了此功能。我在使用基于 Docker 的服务器 运行 Debian Jessie 时遇到了很多问题。也许与端口重新映射有关。 Docker 是一种黑盒子,我发现它很难使用和调试。幸运的是,AWS 的好心人刚刚在 2015 年 4 月 8 日添加了一个非 docker Python 3.4 选项。
我进行了大量搜索以部署和运行它。我看到很多问题没有答案。所以这是我部署的非常简单的 python 3.4/flask/celery 过程。
Celery 你可以直接 pip 安装。您需要使用配置命令或 container_command 从配置文件安装 rabbitmq。我在我上传的项目 zip 中使用脚本,因此需要 container_command 才能使用该脚本(常规 eb config 命令在项目安装之前发生)。
[yourapproot]/.ebextensions/05_install_rabbitmq.config:
container_commands:
01RunScript:
command: bash ./init_scripts/app_setup.sh
[yourapproot]/init_scripts/app_setup.sh:
#!/usr/bin/env bash
# Download and install Erlang
yum install erlang
# Download the latest RabbitMQ package using wget:
wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.5.1/rabbitmq-server-3.5.1-1.noarch.rpm
# Install rabbit
rpm --import http://www.rabbitmq.com/rabbitmq-signing-key-public.asc
yum -y install rabbitmq-server-3.5.1-1.noarch.rpm
# Start server
/sbin/service rabbitmq-server start
我正在做一个烧瓶应用程序,所以我在第一个请求之前启动了工作人员:
@app.before_first_request
def before_first_request():
task_mgr.start_celery()
task_mgr 创建了 celery app 对象(我称之为 celery,因为 flask app 对象是 app)。对于简单的任务管理器,-Ofair 在这里非常关键。任务预取有各种奇怪的行为。这应该是默认设置?
task_mgr/task_mgr.py:
import celery as celery_module
import multiprocessing
class WorkerProcess(multiprocessing.Process):
def __init__(self):
super().__init__(name='celery_worker_process')
def run(self):
argv = [
'worker',
'--loglevel=WARNING',
'--hostname=local',
'-Ofair',
]
celery.worker_main(argv)
def start_celery():
global worker_process
multiprocessing.set_start_method('fork') # 'spawn' seems to work also
worker_process = WorkerProcess()
worker_process.start()
def stop_celery():
global worker_process
if worker_process:
worker_process.terminate()
worker_process = None
worker_name = 'celery@local'
worker_process = None
celery = celery_module.Celery()
celery.config_from_object('task_mgr.celery_config')
到目前为止我的配置非常简单:
task_mgr/celery_config.py:
BROKER_URL = 'amqp://'
CELERY_RESULT_BACKEND = 'amqp://'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json' # 'pickle' warning: can't use datetime in json
CELERY_RESULT_SERIALIZER = 'json' # 'pickle' warning: can't use datetime in json
CELERY_TASK_RESULT_EXPIRES = 18000 # Results hang around for 5 hours
CELERYD_CONCURRENCY = 4
然后你可以把任务放在你需要的地方:
from task_mgr.task_mgr import celery
import time
@celery.task(bind=True)
def error_task(self):
self.update_state(state='RUNNING')
time.sleep(10)
raise KeyError('im an error')
@celery.task(bind=True)
def long_task(self):
self.update_state(state='RUNNING')
time.sleep(20)
return 'long task finished'
@celery.task(bind=True)
def task_with_status(self, wait):
self.update_state(state='RUNNING')
for i in range(5):
time.sleep(wait)
self.update_state(
state='PROGRESS',
meta={
'current': i + 1,
'total': 5,
'status': 'progress',
'host': self.request.hostname,
}
)
time.sleep(wait)
return 'finished with wait = ' + str(wait)
我还保留了一个任务队列来保存异步结果,以便我可以监控任务:
task_queue = []
def queue_task(task, *args):
async_result = task.apply_async(args)
task_queue.append(
{
'task_name':task.__name__,
'task_args':args,
'async_result':async_result
}
)
return async_result
def get_tasks_info():
tasks = []
for task in task_queue:
task_name = task['task_name']
task_args = task['task_args']
async_result = task['async_result']
task_id = async_result.id
task_state = async_result.state
task_result_info = async_result.info
task_result = async_result.result
tasks.append(
{
'task_name': task_name,
'task_args': task_args,
'task_id': task_id,
'task_state': task_state,
'task_result.info': task_result_info,
'task_result': task_result,
}
)
return tasks
当然,在您需要的地方开始任务:
from webapp.app import app
from flask import url_for, render_template, redirect
from webapp import tasks
from task_mgr import task_mgr
@app.route('/start_all_tasks')
def start_all_tasks():
task_mgr.queue_task(tasks.long_task)
task_mgr.queue_task(tasks.error_task)
for i in range(1, 9):
task_mgr.queue_task(tasks.task_with_status, i * 2)
return redirect(url_for('task_status'))
@app.route('/task_status')
def task_status():
current_tasks = task_mgr.get_tasks_info()
return render_template(
'parse/task_status.html',
tasks=current_tasks
)
仅此而已。如果您需要任何帮助,请告诉我,尽管我的芹菜知识仍然相当有限。