用于 celery 的原始 PyMySQL 多线程
Mutlithreading with raw PyMySQL for celery
在我目前正在做的项目中,我不允许使用ORM所以我做了my own
它工作得很好,但我在使用 Celery 时遇到了问题,它的并发性。有一段时间,我将它设置为 1
(使用 --concurrency=1
),但我添加了 new tasks,这比使用芹菜 运行 需要更多的时间来处理节拍,导致大量任务积压
当我将 celery 的并发设置为 > 1 时,会发生以下情况(pastebin 因为它很大):
关于如何在其他进程上实施某种 lock/wait 以使不同的工作人员不会相互交叉,我有什么想法吗?
编辑:这是我设置 PyMySQL instance and how the open and close are handled
的地方
PyMSQL不允许线程共享同一个连接(模块可以共享,但是线程不能共享一个连接)。您的模型 class 到处都是 reusing the same connection。
因此,当不同的工作人员调用模型进行查询时,他们使用相同的连接对象,从而导致冲突。
确保您的连接对象是 thread-local。不要使用 db
class 属性,而是考虑一种方法来检索 thread-local 连接对象,而不是重用可能在不同线程中创建的连接对象。
例如,.
现在,您正在为每个模型到处使用全球连接。
# Connect to the database
connection = pymysql.connect(**database_config)
class Model(object):
"""
Base Model class, all other Models will inherit from this
"""
db = connection
为避免这种情况,您可以改为在 __init__
方法中创建数据库...
class Model(object):
"""
Base Model class, all other Models will inherit from this
"""
def __init__(self, *args, **kwargs):
self.db = pymysql.connect(**database_config)
但是,这可能不是 efficient/practical,因为 db 对象的每个实例都会创建一个会话。
要对此进行改进,您可以使用一种使用 threading.local
的方法来保持线程的本地连接。
class Model(object):
"""
Base Model class, all other Models will inherit from this
"""
_conn = threading.local()
@property
def db(self):
if not hasattr(self._conn, 'db'):
self._conn.db = pymysql.connect(**database_config)
return self._conn.db
请注意,thread-local 解决方案在假设您使用线程并发模型的情况下有效。另请注意,celery 默认使用多个进程(prefork)。这可能是也可能不是问题。如果这是一个问题,您可以改为 change the workers to use eventlet 来解决它。
在我目前正在做的项目中,我不允许使用ORM所以我做了my own
它工作得很好,但我在使用 Celery 时遇到了问题,它的并发性。有一段时间,我将它设置为 1
(使用 --concurrency=1
),但我添加了 new tasks,这比使用芹菜 运行 需要更多的时间来处理节拍,导致大量任务积压
当我将 celery 的并发设置为 > 1 时,会发生以下情况(pastebin 因为它很大):
关于如何在其他进程上实施某种 lock/wait 以使不同的工作人员不会相互交叉,我有什么想法吗?
编辑:这是我设置 PyMySQL instance and how the open and close are handled
的地方PyMSQL不允许线程共享同一个连接(模块可以共享,但是线程不能共享一个连接)。您的模型 class 到处都是 reusing the same connection。
因此,当不同的工作人员调用模型进行查询时,他们使用相同的连接对象,从而导致冲突。
确保您的连接对象是 thread-local。不要使用 db
class 属性,而是考虑一种方法来检索 thread-local 连接对象,而不是重用可能在不同线程中创建的连接对象。
例如,
现在,您正在为每个模型到处使用全球连接。
# Connect to the database
connection = pymysql.connect(**database_config)
class Model(object):
"""
Base Model class, all other Models will inherit from this
"""
db = connection
为避免这种情况,您可以改为在 __init__
方法中创建数据库...
class Model(object):
"""
Base Model class, all other Models will inherit from this
"""
def __init__(self, *args, **kwargs):
self.db = pymysql.connect(**database_config)
但是,这可能不是 efficient/practical,因为 db 对象的每个实例都会创建一个会话。
要对此进行改进,您可以使用一种使用 threading.local
的方法来保持线程的本地连接。
class Model(object):
"""
Base Model class, all other Models will inherit from this
"""
_conn = threading.local()
@property
def db(self):
if not hasattr(self._conn, 'db'):
self._conn.db = pymysql.connect(**database_config)
return self._conn.db
请注意,thread-local 解决方案在假设您使用线程并发模型的情况下有效。另请注意,celery 默认使用多个进程(prefork)。这可能是也可能不是问题。如果这是一个问题,您可以改为 change the workers to use eventlet 来解决它。