Peewee、SQLite 和线程

Peewee, SQLite and threading

我正在开发一个线程应用程序,其中一个线程将向 Queue 提供要修改的对象,然后许多其他线程将从队列中读取、进行修改并保存更改。

应用程序不需要很多并发,所以我想坚持使用 SQLite 数据库。这是一个说明应用程序的小示例:

import queue
import threading
import peewee as pw

db = pw.SqliteDatabase('test.db', threadlocals=True)

class Container(pw.Model):
    contents = pw.CharField(default="spam")

    class Meta:
        database = db


class FeederThread(threading.Thread):

    def __init__(self, input_queue):
        super().__init__()

        self.q = input_queue

    def run(self):
        containers = Container.select()

        for container in containers:
            self.q.put(container)


class ReaderThread(threading.Thread):

    def __init__(self, input_queue):
        super().__init__()

        self.q = input_queue

    def run(self):
        while True:
            item = self.q.get()

            with db.execution_context() as ctx:
                # Get a new connection to the container object:
                container = Container.get(id=item.id)
                container.contents = "eggs"
                container.save()

            self.q.task_done()


if __name__ == "__main__":

    db.connect()
    try:
        db.create_tables([Container,])
    except pw.OperationalError:
        pass
    else:
        [Container.create() for c in range(42)]
    db.close()

    q = queue.Queue(maxsize=10)


    feeder = FeederThread(q)
    feeder.setDaemon(True)
    feeder.start()

    for i in range(10):
        reader = ReaderThread(q)
        reader.setDaemon(True)
        reader.start()

    q.join()

基于 peewee 文档,SQLite 应该支持多线程。但是,我不断收到臭名昭著的 peewee.OperationalError: database is locked 错误,错误输出指向 container.save() 行。

我该如何解决这个问题?

你试过WAL模式吗?

Improve INSERT-per-second performance of SQLite?

You have to be quite careful if you have concurrent access to SQLite, as the whole database is locked when writes are done, and although multiple readers are possible, writes will be locked out. This has been improved somewhat with the addition of a WAL in newer SQLite versions.

If you are using multiple threads, you can try using the shared page cache, which will allow loaded pages to be shared between threads, which can avoid expensive I/O calls.

我也很惊讶地看到这个失败了,所以我复制了你的代码并尝试了一些不同的想法。我认为问题在于,默认情况下 ExecutionContext() 会导致交易中的包装块 运行。为了避免这种情况,我在 reader 线程中传入了 False

我还编辑了馈线以在将内容放入队列之前使用 SELECT 语句 (list(Container.select()))。

以下在本地对我有用:

class FeederThread(threading.Thread):

    def __init__(self, input_queue):
        super(FeederThread, self).__init__()

        self.q = input_queue

    def run(self):
        containers = list(Container.select())

        for container in containers:
            self.q.put(container.id)  # I don't like passing model instances around like this, personal preference though

class ReaderThread(threading.Thread):

    def __init__(self, input_queue):
        super(ReaderThread, self).__init__()

        self.q = input_queue

    def run(self):
        while True:
            item = self.q.get()

            with db.execution_context(False):
                # Get a new connection to the container object:
                container = Container.get(id=item)
                container.contents = "nuggets"
                with db.atomic():
                    container.save()

            self.q.task_done()

if __name__ == "__main__":

    with db.execution_context():
        try:
            db.create_tables([Container,])
        except OperationalError:
            pass
        else:
            [Container.create() for c in range(42)]

    # ... same ...

我对此并不完全满意,但希望它能给你一些想法。

这是一篇博客 post 我前段时间写了一些关于使用 SQLite 获得更高并发性的技巧:http://charlesleifer.com/blog/sqlite-small-fast-reliable-choose-any-three-/