用于 celery 的原始 PyMySQL 多线程

Mutlithreading with raw PyMySQL for celery

在我目前正在做的项目中,我不允许使用ORM所以我做了my own

它工作得很好,但我在使用 Celery 时遇到了问题,它的并发性。有一段时间,我将它设置为 1(使用 --concurrency=1),但我添加了 new tasks,这比使用芹菜 运行 需要更多的时间来处理节拍,导致大量任务积压

当我将 celery 的并发设置为 > 1 时,会发生以下情况(pastebin 因为它很大):

https://pastebin.com/M4HZXTDC

关于如何在其他进程上实施某种 lock/wait 以使不同的工作人员不会相互交叉,我有什么想法吗?

编辑:这是我设置 PyMySQL instance and how the open and close are handled

的地方

PyMSQL不允许线程共享同一个连接(模块可以共享,但是线程不能共享一个连接)。您的模型 class 到处都是 reusing the same connection

因此,当不同的工作人员调用模型进行查询时,他们使用相同的连接对象,从而导致冲突。

确保您的连接对象是 thread-local。不要使用 db class 属性,而是考虑一种方法来检索 thread-local 连接对象,而不是重用可能在不同线程中创建的连接对象。

例如,.

现在,您正在为每个模型到处使用全球连接。

# Connect to the database
connection = pymysql.connect(**database_config)


class Model(object):
    """
    Base Model class, all other Models will inherit from this
    """

    db = connection

为避免这种情况,您可以改为在 __init__ 方法中创建数据库...

class Model(object):
    """
    Base Model class, all other Models will inherit from this
    """

    def __init__(self, *args, **kwargs):
        self.db = pymysql.connect(**database_config)

但是,这可能不是 efficient/practical,因为 db 对象的每个实例都会创建一个会话。

要对此进行改进,您可以使用一种使用 threading.local 的方法来保持线程的本地连接。



class Model(object):
    """
    Base Model class, all other Models will inherit from this
    """
    _conn = threading.local()
    @property
    def db(self):
        if not hasattr(self._conn, 'db'):
            self._conn.db = pymysql.connect(**database_config)
        return self._conn.db

请注意,thread-local 解决方案在假设您使用线程并发模型的情况下有效。另请注意,celery 默认使用多个进程(prefork)。这可能是也可能不是问题。如果这是一个问题,您可以改为 change the workers to use eventlet 来解决它。