为每个芹菜工人创建单独的数据库连接

Creating separate database connection for every celery worker

当工作人员在创建后执行任务时,我一直 运行 陷入奇怪的 mysql 问题。

我们使用 django 1.3、celery 3.1.17、djorm-ext-pool 0.5

我们以并发 3 启动 celery 进程。 到目前为止,我的观察是,当工作进程启动时,它们都获得相同的 mysql 连接。我们记录数据库连接 ID 如下。

from django.db import connection
connection.cursor()
logger.info("Task %s processing with db connection %s", str(task_id), str(connection.connection.thread_id()))

当所有工作人员都获得任务时,第一个成功执行但另外两个出现奇怪的 Mysql 错误。它要么出现 "Mysql server gone away" 错误,要么出现 Django 抛出 "DoesNotExist" 错误的情况。显然 Django 正在查询的对象确实存在。

出现这个错误后,每个工作人员开始获得自己的数据库连接,之后我们没有发现任何问题。

celery 的默认行为是什么?它是否旨在共享相同的数据库连接。如果是这样,如何处理进程间通信? 理想情况下,我希望每个工作人员都有不同的数据库连接。

我尝试了下面提到的代码 link 但没有用。 Celery Worker Database Connection Pooling

我们还修复了下面建议的芹菜代码。 https://github.com/celery/celery/issues/2453

对于那些对问题投反对票的人,请告诉我投反对票的原因。

Celery 使用以下命令启动

celery -A myproject worker --loglevel=debug --concurrency=3 -Q testqueue

myproject.py 作为主进程的一部分,在分叉工作进程之前对 mysql 数据库进行了一些查询。

作为主进程中查询流程的一部分,django ORM 会创建一个 sqlalchemy 连接池(如果它尚不存在)。然后创建工作进程。

Celery 作为 Django 修复的一部分会关闭现有连接。

    def close_database(self, **kwargs):
    if self._close_old_connections:
        return self._close_old_connections()  # Django 1.6
    if not self.db_reuse_max:
        return self._close_database()
    if self._db_recycles >= self.db_reuse_max * 2:
        self._db_recycles = 0
        self._close_database()
    self._db_recycles += 1

实际上可能发生的情况是,具有一个未使用的数据库连接的 sqlalchemy 池对象在分叉时被复制到 3 个工作进程。所以3个不同的池有3个连接对象指向同一个连接文件描述符。

工作人员在请求数据库连接时执行任务时,所有工作人员都从 sqlalchemy 池中获得相同的未使用连接,因为该连接当前未使用。所有连接都指向同一个文件描述符这一事实导致 MySQL 连接消失错误。

之后创建的新连接都是新的,并不指向相同的套接字文件描述符。

解决方案:

在主进程中添加

from django.db import connection
connection.cursor()

在任何导入完成之前。即甚至在添加 djorm-ext-pool 模块之前。

这样所有的数据库查询都将使用 django 在池外创建的连接。当 celery django fixup 关闭连接时,连接实际上被关闭,而不是返回炼金术池,在分叉时处理所有工作人员时,炼金术池中没有任何连接。之后,当工作人员请求数据库连接时,sqlalchemy returns 是新创建的连接之一。