ZODB 提交卡住并使我的整个应用程序冻结

ZODB commit stuck and make my whole application to freeze

今天我在使用 ZODB 的 python 应用程序中发现了一个错误。 试图找出我的应用程序冻结的原因,我认为是 ZODB 造成的。

将日志记录设置为调试,似乎在提交时,ZODB 会找到 2 个连接,然后开始冻结。

INFO:ZEO.ClientStorage:('127.0.0.1', 8092) Connected to storage: ('localhost', 8092)
DEBUG:txn.140661100980032:new transaction
DEBUG:txn.140661100980032:commit
DEBUG:ZODB.Connection:Committing savepoints of size 1858621925
DEBUG:discord.gateway:Keeping websocket alive with sequence 59.
DEBUG:txn.140661100980032:commit <Connection at 7fee2d080fd0>
DEBUG:txn.140661100980032:commit <Connection at 7fee359e5cc0>

由于我是 ZODB 初学者,有没有关于如何解决/如何深入挖掘的想法?

这似乎与并发提交有关。

我认为打开一个新连接会启动一个专用的事务管理器,但事实并非如此。在未指定事务管理器的情况下启动新连接时,将使用本地连接(与线程上的其他连接共享)。

我的代码:

async def get_connection():
    return ZEO.connection(8092)

async def _message_db_init_aux(self, channel, after=None, before=None):
    connexion = await get_connection()
    root = connexion.root()

    messages = await some_function_which_return_a_list()

    async for message in messages:
        # If author.id doesn't exist on the data, let's initiate it as a Tree
        if message.author.id not in root.data: # root.data is a BTrees.OOBTree.BTree()
            root.data[message.author.id] = BTrees.OOBTree.BTree()

        # Message is a defined classed inherited from persistant.Persistant
        root.data[message.author.id][message.id] = Message(message.id, message.author.id, message.created_at)

    transaction.commit()
    connexion.close()

如果不精确,则使用本地事务管理器。 如果您在同一线程上打开多个连接,则必须确定要使用的事务管理器。默认

 transaction.commit()

是本地事务管理器。

 connection.transaction.manager.commit()

将使用专用于事务的事务管理器(而不是本地事务管理器)。

更多信息,请查看http://www.zodb.org/en/latest/guide/transactions-and-threading.html

不要跨连接重复使用事务管理器。每个连接都有自己的事务管理器,使用那个

您的代码当前创建连接,然后提交。与其创建连接,不如让数据库为您创建一个事务管理器,然后由它管理自己的连接。事务管理器可以用作 上下文管理器,这意味着当上下文结束时,对数据库的更改会自动提交。

此外,通过对每个事务使用 ZEO.connection() ,您将迫使 ZEO 创建一个全新的客户端对象,其中包含全新的缓存和连接池。通过使用 ZEO.DB() 代替,并缓存结果,创建了一个客户端,可以从中合并和重用连接,并使用本地缓存来加速事务。

我会将代码更改为:

def get_db():
    """Access the ZEO database client.

    The database client is cached to take advantage of caching and connection pooling

    """
    db = getattr(get_db, 'db', None)
    if db is None:
        get_db.db = db = ZEO.DB(8092)
    return db

async def _message_db_init_aux(self, channel, after=None, before=None):
    with self.get_db().transaction() as conn:
        root = conn.root()

        messages = await some_function_which_return_a_list()

        async for message in messages:
            # If author.id doesn't exist on the data, let's initiate it as a Tree
            if message.author.id not in root.data: # root.data is a BTrees.OOBTree.BTree()
                root.data[message.author.id] = BTrees.OOBTree.BTree()

            # Message is a defined classed inherited from persistant.Persistant
            root.data[message.author.id][message.id] = Message(
                message.id, message.author.id, message.created_at
            )

数据库对象上的 .transaction() 方法在后台创建一个新连接,在进入上下文时(with 导致 __enter__ 被调用),并且当with 块结束事务已提交,连接再次释放到池中。

请注意,我使用了同步 def get_db() 方法; ZEO 客户端代码上的调用签名是完全同步的。从异步代码调用它们是安全的,因为 在幕后 ,实现始终使用 asyncio,在同一循环中使用回调和任务,而实际 I/O 是推迟到单独的任务。