如何将 Diesel 与 SQLite 连接一起使用并避免“数据库已锁定”类型的错误

How to use Diesel with SQLite connections and avoid `database is locked` type of errors

在我的 Rust 应用程序中,我使用 DieselSQLite 数据库进行交互。我有多个线程可以同时查询数据库,我正在使用 crate r2d2 创建一个连接池。

我遇到的问题是我无法同时查询数据库。如果我尝试这样做,我总是会收到错误 database is locked,这是不可恢复的(即使只有一个线程在查询,任何后续请求也会因相同的错误而失败)。

以下代码重现了该问题。

# Cargo.toml
[dependencies]
crossbeam = { version = "0.7.1" }
diesel = { version = "1.4.2", features = ["sqlite", "r2d2"] }
-- The database table
CREATE TABLE users (
    name TEXT PRIMARY KEY NOT NULL
);
#[macro_use]
extern crate diesel;

mod schema;

use crate::schema::*;
use crossbeam;
use diesel::r2d2::{ConnectionManager, Pool};
use diesel::RunQueryDsl;
use diesel::{ExpressionMethods, SqliteConnection};

#[derive(Insertable, Queryable, Debug, Clone)]
#[table_name = "users"]
struct User {
    name: String,
}

fn main() {
    let db_url = "test.sqlite3";
    let pool = Pool::builder()
        .build(ConnectionManager::<SqliteConnection>::new(db_url))
        .unwrap();

    crossbeam::scope(|scope| {
        let pool2 = pool.clone();
        scope.spawn(move |_| {
            let conn = pool2.get().unwrap();
            for i in 0..100 {
                let name = format!("John{}", i);
                diesel::delete(users::table)
                    .filter(users::name.eq(&name))
                    .execute(&conn)
                    .unwrap();
            }
        });

        let conn = pool.get().unwrap();
        for i in 0..100 {
            let name = format!("John{}", i);
            diesel::insert_into(users::table)
                .values(User { name })
                .execute(&conn)
                .unwrap();
        }
    })
    .unwrap();
}

这是应用程序崩溃时显示的错误:

thread '<unnamed>' panicked at 'called `Result::unwrap()` on an `Err` value: DatabaseError(__Unknown, "database is locked")'

AFAIK,我应该可以使用多线程的连接池(即多个线程的多个连接),如 r2d2_sqlite crate example.[=23= 所示]

此外,我在系统中安装的sqlite3库支持Serialized线程模型,来自here:

In serialized mode, SQLite can be safely used by multiple threads with no restriction.

如何避免 database is locked 错误?另外,如果由于任何原因无法避免这些错误,我该如何解锁数据库?

最近我也无意中发现了这个问题。这是我的发现。

SQLite 支持多个编写器。

来自文档:

When SQLite tries to access a file that is locked by another process, the default behavior is to return SQLITE_BUSY.

那么如何绕过这个限制呢?我看到了两种解决方案。

忙超时

您可以多次重试查询,直到获得锁。 事实上SQLite提供了built-in mechanism。 您可以指示 SQLite 多次尝试锁定数据库。

现在您唯一需要做的就是以某种方式将此 pragma 传递给 SQLite。 幸运的是 diesel::r2d2 提供了一种简单的方法来通过新建立的连接的初始设置:

#[derive(Debug)]
pub struct ConnectionOptions {
    pub enable_wal: bool,
    pub enable_foreign_keys: bool,
    pub busy_timeout: Option<Duration>,
}

impl diesel::r2d2::CustomizeConnection<SqliteConnection, diesel::r2d2::Error>
    for ConnectionOptions
{
    fn on_acquire(&self, conn: &mut SqliteConnection) -> Result<(), diesel::r2d2::Error> {
        (|| {
            if self.enable_wal {
                conn.batch_execute("PRAGMA journal_mode = WAL; PRAGMA synchronous = NORMAL;")?;
            }
            if self.enable_foreign_keys {
                conn.batch_execute("PRAGMA foreign_keys = ON;")?;
            }
            if let Some(d) = self.busy_timeout {
                conn.batch_execute(&format!("PRAGMA busy_timeout = {};", d.as_millis()))?;
            }
            Ok(())
        })()
        .map_err(diesel::r2d2::Error::QueryError)
    }
}

// ------------- Example -----------------

    let pool = Pool::builder()
        .max_size(16)
        .connection_customizer(Box::new(ConnectionOptions {
            enable_wal: true,
            enable_foreign_keys: true,
            busy_timeout: Some(Duration::from_secs(30)),
        }))
        .build(ConnectionManager::<SqliteConnection>::new(db_url))
        .unwrap();

WAL 模式

您可能要使用的第二个变体是 WAL 模式。它通过让读者和作者同时工作来提高并发性(WAL 模式比默认日志模式快很多)。 但是请注意,繁忙超时是 still required 才能使所有这些正常工作。

(请同时阅读 "synchronous" 模式设置为“正常”的后果。)

SQLITE_BUSY_SNAPSHOT 是 WAL 模式下可能发生的事情。但是有一个简单的补救方法 - 使用 BEGIN IMMEDIATE 以写入模式启动事务。

这样你就可以拥有多个readers/writers,这让生活更轻松。多个写者使用锁定机制(通过busy_timeout),所以此时只有一个活跃的写者。您当然不希望将连接限定为读写并在您的应用程序中手动进行锁定,例如Mutex.

我发现设置 r2d2::Pool::builder().max_size(1) 可以解决问题,但是你需要小心你的连接管理,不要问你是否已经有一个,例如:

fn create(pool: &DbPool, data: User) {
    let conn = pool.get().unwrap(); // One connection
    if !exist(pool, data) { // Two connection
        diesel::insert_into(users::table)
            .values(User { name: data.name })
            .execute(&conn)
            .unwrap();
    }
}

fn exist(pool: &DbPool, data: User) -> bool {
    let conn = pool.get().unwrap();
    Ok(
        select(exists(users::table.filter(col_user_name.eq(data.name))))
            .get_result(&conn)
            .unwrap(),
    )
}

我删除了所有 let conn = pool.get().unwrap(); 并且只尝试将其放入请求中。

diesel::insert_into(users::table)
    .values(User { name })
    .execute(&pool.get().unwrap())
    .unwrap();