Celery - 每个 task/process 只有一个实例?

Celery - Only one instance per task/process?

在 celery 文档中,Instantiation (http://celery.readthedocs.org/en/latest/userguide/tasks.html#custom-task-classes) 部分说明如下:

A task is not instantiated for every request, but is registered in the task registry as a global instance.

This means that the init constructor will only be called once per process, and that the task class is semantically closer to an Actor.

然而,当我 运行 以下示例时,我看到 init 方法至少被调用了 3 次。设置有什么问题? CELERYD_CONCURRENCY = 1 应该确保每个 worker 只有一个进程,对吧?

$ celery -A proj beat

celery beat v3.1.17 (Cipater) is starting.
init Task1
40878160
x=1.0
init Task1
40878352
x=1.0
init Task1
40879312
x=1.0
__    -    ... __   -        _
Configuration ->
    . broker -> amqp://guest:**@localhost:5672//
    . loader -> celery.loaders.app.AppLoader
    . scheduler -> celery.beat.PersistentScheduler
    . db -> celerybeat-schedule
    . logfile -> [stderr]@%INFO
    . maxinterval -> now (0s)
[2015-02-05 23:05:21,875: INFO/MainProcess] beat: Starting...
[2015-02-05 23:05:21,971: INFO/MainProcess] Scheduler: Sending due task    task1-every-5-seconds (proj.tasks.t1)
[2015-02-05 23:05:26,972: INFO/MainProcess] Scheduler: Sending due task task1-every-5-seconds (proj.tasks.t1)

celery.py:

from __future__ import absolute_import
from datetime import timedelta
from celery import Celery

app = Celery('proj',
             broker='amqp://guest@localhost//',
             backend='amqp://',
             include=['proj.tasks'])
app.conf.update(
    CELERY_REDIRECT_STDOUTS=True,
    CELERY_TASK_RESULT_EXPIRES=60,
    CELERYD_CONCURRENCY = 1,
    CELERYBEAT_SCHEDULE = {
        'task1-every-5-seconds': {
            'task': 'proj.tasks.t1',
            'schedule': timedelta(seconds=5)
            },
        },
    CELERY_TIMEZONE = 'GMT',
)

if __name__ == '__main__':
    app.start()

tasks.py:

from __future__ import absolute_import
from proj.celery import app
from celery import Task
import time

class Foo():
    def __init__(self, x):
        self.x = x

class Task1(Task):
    abstract = True
    def __init__(self):
        print "init Task1"
        print id(self)
        self.f = Foo(1.0)
        print "x=1.0"

@app.task(base=Task1)
def t1():
    t1.f.x +=1
    print t1.f.x

因此,根据您的评论,您需要为每个线程维护一个连接。

那为什么不使用线程存储呢?它应该对你来说是一个安全的解决方案。

from threading import local

thread_storage = local()

def get_or_create_conntection(*args, **kwargs):
    if not hasattr(thread_storage, 'connection'):
        thread_storage.connection = Connection(*args, **kwargs)
    return thread_storage.connection

@app.task()
def do_stuff():
    connection = get_or_create_connection('some', connection='args')
    connection.ping()