如何在 Rust SQLx 中测试两个并行事务?

How to test two parallel transactions in Rust SQLx?

我正在试验 Rocket、Rust 和 SQLx,我想测试当两个并行事务试图在我的 table 上插入重复记录时会发生什么。

我的 insert fn 没有什么特别之处,而且工作正常:

async fn insert_credentials<'ex, EX>(&self, executor: EX, credentials: &Credentials) -> Result<u64, Errors>
where
    EX: 'ex + Executor<'ex, Database = Postgres>,
{
    sqlx::query!(
        r#"INSERT INTO credentials (username, password)
        VALUES (, crypt(, gen_salt('bf')))"#,
        credentials.username,
        credentials.password,
    )
    .execute(executor)
    .await
    .map(|result| result.rows_affected())
    .map_err(|err| err.into())
}

不过,我的测试无限期挂起,因为它等待从未发生的提交:

#[async_std::test]
async fn it_should_reject_duplicated_username_in_parallel() {
    let repo = new_repo();
    let db: Pool<Postgres> = connect().await;
    let credentials = new_random_credentials();

    println!("TX1 begins");
    let mut tx1 = db.begin().await.unwrap();
    let rows_affected = repo.insert_credentials(&mut tx1, &credentials).await.unwrap();
    assert_eq!(rows_affected, 1);

    println!("TX2 begins");
    let mut tx2 = db.begin().await.unwrap();
    println!("It hangs on the next line");
    let rows_affected = repo.insert_credentials(&mut tx2, &credentials).await.unwrap();
    assert_eq!(rows_affected, 1);
    
    println!("It never reaches this line");
    tx1.commit().await.unwrap();
    tx2.commit().await.unwrap();
}

如何并行创建和执行这些 TX,以便在尝试提交第二个 TX 时断言通过但测试失败?

作为参考,这是我的 Cargo.toml

[package]
name = "auth"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
async-trait = "0.1.52"
serde = "1.0.136"
thiserror = "1.0.30"

# TODO https://github.com/SergioBenitez/Rocket/issues/1893#issuecomment-1002393878
rocket = { git = "https://github.com/SergioBenitez/Rocket", features = ["json"] }

[dependencies.redis]
version = "0.21.5"
features = ["tokio-comp"]

[dependencies.sqlx]
version = "0.5.11"
features = ["macros", "runtime-tokio-rustls", "postgres"]

[dependencies.uuid]
version = "1.0.0-alpha.1"
features = ["v4", "fast-rng", "macro-diagnostics"]

## DEV ##

[dev-dependencies]
mockall = "0.11.0"

[dev-dependencies.async-std]
version = "1.11.0"
features = ["attributes", "tokio1"]

您可以使用 async_std::future::timeout or tokio::time::timeout。使用 async_std 的示例:

use async_std::future;
use std::time::Duration;

let max_duration = Duration::from_millis(100);
assert!(timeout(max_duration, tx2.commit()).await.is_err());

如果你想在完成tx1之前继续tx2,你可以先async_std::task::spawn or tokio::spawn tx1:

async_std::task::spawn(async move {
    assert!(tx1.commit().await.is_ok());
});

@Mika 为我指出了正确的方向,我可以生成两个事务并添加一些超时时间来为并发 TX 提供一些时间来执行。

    let handle1 = tokio::spawn(async move {
        let repo = new_repo();
        let mut tx = db1.begin().await.unwrap();
        let rows_affected = repo.insert_credentials(&mut tx, &credentials1).await.unwrap();
        assert_eq!(rows_affected, 1);
        tokio::time::sleep(Duration::from_millis(100)).await;
        tx.commit().await.unwrap()
    });
    
    let handle2 = tokio::spawn(async move {
        let repo = new_repo();
        let mut tx = db2.begin().await.unwrap();
        let rows_affected = repo.insert_credentials(&mut tx, &credentials2).await.unwrap();
        assert_eq!(rows_affected, 1);
        tokio::time::sleep(Duration::from_millis(100)).await;
        tx.commit().await.unwrap()
    });

    let (_first, _second) = rocket::tokio::try_join!(handle1, handle2).unwrap();

我认为这样两个 TX 将并行执行直到睡眠线,然后一个提交,另一个在提交线上失败。但是不,实际上两个 TX 是并行执行的,TX1 一直运行到睡眠,TX2 在插入行上阻塞直到 TX1 提交,然后 TX2 在插入行上失败。

我想这就是 DB 在这种情况下的工作方式,也许我可以通过扰乱 TX 隔离来改变它,但这不是我的意图。我玩游戏是为了学习更多,今天的学习就够了:)