为每个芹菜工人创建单独的数据库连接
Creating separate database connection for every celery worker
当工作人员在创建后执行任务时,我一直 运行 陷入奇怪的 mysql 问题。
我们使用 django 1.3、celery 3.1.17、djorm-ext-pool 0.5
我们以并发 3 启动 celery 进程。
到目前为止,我的观察是,当工作进程启动时,它们都获得相同的 mysql 连接。我们记录数据库连接 ID 如下。
from django.db import connection
connection.cursor()
logger.info("Task %s processing with db connection %s", str(task_id), str(connection.connection.thread_id()))
当所有工作人员都获得任务时,第一个成功执行但另外两个出现奇怪的 Mysql 错误。它要么出现 "Mysql server gone away" 错误,要么出现 Django 抛出 "DoesNotExist" 错误的情况。显然 Django 正在查询的对象确实存在。
出现这个错误后,每个工作人员开始获得自己的数据库连接,之后我们没有发现任何问题。
celery 的默认行为是什么?它是否旨在共享相同的数据库连接。如果是这样,如何处理进程间通信?
理想情况下,我希望每个工作人员都有不同的数据库连接。
我尝试了下面提到的代码 link 但没有用。
Celery Worker Database Connection Pooling
我们还修复了下面建议的芹菜代码。
https://github.com/celery/celery/issues/2453
对于那些对问题投反对票的人,请告诉我投反对票的原因。
Celery 使用以下命令启动
celery -A myproject worker --loglevel=debug --concurrency=3 -Q testqueue
myproject.py
作为主进程的一部分,在分叉工作进程之前对 mysql 数据库进行了一些查询。
作为主进程中查询流程的一部分,django ORM 会创建一个 sqlalchemy 连接池(如果它尚不存在)。然后创建工作进程。
Celery 作为 Django 修复的一部分会关闭现有连接。
def close_database(self, **kwargs):
if self._close_old_connections:
return self._close_old_connections() # Django 1.6
if not self.db_reuse_max:
return self._close_database()
if self._db_recycles >= self.db_reuse_max * 2:
self._db_recycles = 0
self._close_database()
self._db_recycles += 1
实际上可能发生的情况是,具有一个未使用的数据库连接的 sqlalchemy 池对象在分叉时被复制到 3 个工作进程。所以3个不同的池有3个连接对象指向同一个连接文件描述符。
工作人员在请求数据库连接时执行任务时,所有工作人员都从 sqlalchemy 池中获得相同的未使用连接,因为该连接当前未使用。所有连接都指向同一个文件描述符这一事实导致 MySQL 连接消失错误。
之后创建的新连接都是新的,并不指向相同的套接字文件描述符。
解决方案:
在主进程中添加
from django.db import connection
connection.cursor()
在任何导入完成之前。即甚至在添加 djorm-ext-pool
模块之前。
这样所有的数据库查询都将使用 django 在池外创建的连接。当 celery django fixup 关闭连接时,连接实际上被关闭,而不是返回炼金术池,在分叉时处理所有工作人员时,炼金术池中没有任何连接。之后,当工作人员请求数据库连接时,sqlalchemy returns 是新创建的连接之一。
当工作人员在创建后执行任务时,我一直 运行 陷入奇怪的 mysql 问题。
我们使用 django 1.3、celery 3.1.17、djorm-ext-pool 0.5
我们以并发 3 启动 celery 进程。 到目前为止,我的观察是,当工作进程启动时,它们都获得相同的 mysql 连接。我们记录数据库连接 ID 如下。
from django.db import connection
connection.cursor()
logger.info("Task %s processing with db connection %s", str(task_id), str(connection.connection.thread_id()))
当所有工作人员都获得任务时,第一个成功执行但另外两个出现奇怪的 Mysql 错误。它要么出现 "Mysql server gone away" 错误,要么出现 Django 抛出 "DoesNotExist" 错误的情况。显然 Django 正在查询的对象确实存在。
出现这个错误后,每个工作人员开始获得自己的数据库连接,之后我们没有发现任何问题。
celery 的默认行为是什么?它是否旨在共享相同的数据库连接。如果是这样,如何处理进程间通信? 理想情况下,我希望每个工作人员都有不同的数据库连接。
我尝试了下面提到的代码 link 但没有用。 Celery Worker Database Connection Pooling
我们还修复了下面建议的芹菜代码。 https://github.com/celery/celery/issues/2453
对于那些对问题投反对票的人,请告诉我投反对票的原因。
Celery 使用以下命令启动
celery -A myproject worker --loglevel=debug --concurrency=3 -Q testqueue
myproject.py
作为主进程的一部分,在分叉工作进程之前对 mysql 数据库进行了一些查询。
作为主进程中查询流程的一部分,django ORM 会创建一个 sqlalchemy 连接池(如果它尚不存在)。然后创建工作进程。
Celery 作为 Django 修复的一部分会关闭现有连接。
def close_database(self, **kwargs):
if self._close_old_connections:
return self._close_old_connections() # Django 1.6
if not self.db_reuse_max:
return self._close_database()
if self._db_recycles >= self.db_reuse_max * 2:
self._db_recycles = 0
self._close_database()
self._db_recycles += 1
实际上可能发生的情况是,具有一个未使用的数据库连接的 sqlalchemy 池对象在分叉时被复制到 3 个工作进程。所以3个不同的池有3个连接对象指向同一个连接文件描述符。
工作人员在请求数据库连接时执行任务时,所有工作人员都从 sqlalchemy 池中获得相同的未使用连接,因为该连接当前未使用。所有连接都指向同一个文件描述符这一事实导致 MySQL 连接消失错误。
之后创建的新连接都是新的,并不指向相同的套接字文件描述符。
解决方案:
在主进程中添加
from django.db import connection
connection.cursor()
在任何导入完成之前。即甚至在添加 djorm-ext-pool
模块之前。
这样所有的数据库查询都将使用 django 在池外创建的连接。当 celery django fixup 关闭连接时,连接实际上被关闭,而不是返回炼金术池,在分叉时处理所有工作人员时,炼金术池中没有任何连接。之后,当工作人员请求数据库连接时,sqlalchemy returns 是新创建的连接之一。