如何在 Rust SQLx 中测试两个并行事务?
How to test two parallel transactions in Rust SQLx?
我正在试验 Rocket、Rust 和 SQLx,我想测试当两个并行事务试图在我的 table 上插入重复记录时会发生什么。
我的 insert fn 没有什么特别之处,而且工作正常:
async fn insert_credentials<'ex, EX>(&self, executor: EX, credentials: &Credentials) -> Result<u64, Errors>
where
EX: 'ex + Executor<'ex, Database = Postgres>,
{
sqlx::query!(
r#"INSERT INTO credentials (username, password)
VALUES (, crypt(, gen_salt('bf')))"#,
credentials.username,
credentials.password,
)
.execute(executor)
.await
.map(|result| result.rows_affected())
.map_err(|err| err.into())
}
不过,我的测试无限期挂起,因为它等待从未发生的提交:
#[async_std::test]
async fn it_should_reject_duplicated_username_in_parallel() {
let repo = new_repo();
let db: Pool<Postgres> = connect().await;
let credentials = new_random_credentials();
println!("TX1 begins");
let mut tx1 = db.begin().await.unwrap();
let rows_affected = repo.insert_credentials(&mut tx1, &credentials).await.unwrap();
assert_eq!(rows_affected, 1);
println!("TX2 begins");
let mut tx2 = db.begin().await.unwrap();
println!("It hangs on the next line");
let rows_affected = repo.insert_credentials(&mut tx2, &credentials).await.unwrap();
assert_eq!(rows_affected, 1);
println!("It never reaches this line");
tx1.commit().await.unwrap();
tx2.commit().await.unwrap();
}
如何并行创建和执行这些 TX,以便在尝试提交第二个 TX 时断言通过但测试失败?
作为参考,这是我的 Cargo.toml
[package]
name = "auth"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
async-trait = "0.1.52"
serde = "1.0.136"
thiserror = "1.0.30"
# TODO https://github.com/SergioBenitez/Rocket/issues/1893#issuecomment-1002393878
rocket = { git = "https://github.com/SergioBenitez/Rocket", features = ["json"] }
[dependencies.redis]
version = "0.21.5"
features = ["tokio-comp"]
[dependencies.sqlx]
version = "0.5.11"
features = ["macros", "runtime-tokio-rustls", "postgres"]
[dependencies.uuid]
version = "1.0.0-alpha.1"
features = ["v4", "fast-rng", "macro-diagnostics"]
## DEV ##
[dev-dependencies]
mockall = "0.11.0"
[dev-dependencies.async-std]
version = "1.11.0"
features = ["attributes", "tokio1"]
您可以使用 async_std::future::timeout
or tokio::time::timeout
。使用 async_std 的示例:
use async_std::future;
use std::time::Duration;
let max_duration = Duration::from_millis(100);
assert!(timeout(max_duration, tx2.commit()).await.is_err());
如果你想在完成tx1
之前继续tx2
,你可以先async_std::task::spawn
or tokio::spawn
tx1:
async_std::task::spawn(async move {
assert!(tx1.commit().await.is_ok());
});
@Mika 为我指出了正确的方向,我可以生成两个事务并添加一些超时时间来为并发 TX 提供一些时间来执行。
let handle1 = tokio::spawn(async move {
let repo = new_repo();
let mut tx = db1.begin().await.unwrap();
let rows_affected = repo.insert_credentials(&mut tx, &credentials1).await.unwrap();
assert_eq!(rows_affected, 1);
tokio::time::sleep(Duration::from_millis(100)).await;
tx.commit().await.unwrap()
});
let handle2 = tokio::spawn(async move {
let repo = new_repo();
let mut tx = db2.begin().await.unwrap();
let rows_affected = repo.insert_credentials(&mut tx, &credentials2).await.unwrap();
assert_eq!(rows_affected, 1);
tokio::time::sleep(Duration::from_millis(100)).await;
tx.commit().await.unwrap()
});
let (_first, _second) = rocket::tokio::try_join!(handle1, handle2).unwrap();
我认为这样两个 TX 将并行执行直到睡眠线,然后一个提交,另一个在提交线上失败。但是不,实际上两个 TX 是并行执行的,TX1 一直运行到睡眠,TX2 在插入行上阻塞直到 TX1 提交,然后 TX2 在插入行上失败。
我想这就是 DB 在这种情况下的工作方式,也许我可以通过扰乱 TX 隔离来改变它,但这不是我的意图。我玩游戏是为了学习更多,今天的学习就够了:)
我正在试验 Rocket、Rust 和 SQLx,我想测试当两个并行事务试图在我的 table 上插入重复记录时会发生什么。
我的 insert fn 没有什么特别之处,而且工作正常:
async fn insert_credentials<'ex, EX>(&self, executor: EX, credentials: &Credentials) -> Result<u64, Errors>
where
EX: 'ex + Executor<'ex, Database = Postgres>,
{
sqlx::query!(
r#"INSERT INTO credentials (username, password)
VALUES (, crypt(, gen_salt('bf')))"#,
credentials.username,
credentials.password,
)
.execute(executor)
.await
.map(|result| result.rows_affected())
.map_err(|err| err.into())
}
不过,我的测试无限期挂起,因为它等待从未发生的提交:
#[async_std::test]
async fn it_should_reject_duplicated_username_in_parallel() {
let repo = new_repo();
let db: Pool<Postgres> = connect().await;
let credentials = new_random_credentials();
println!("TX1 begins");
let mut tx1 = db.begin().await.unwrap();
let rows_affected = repo.insert_credentials(&mut tx1, &credentials).await.unwrap();
assert_eq!(rows_affected, 1);
println!("TX2 begins");
let mut tx2 = db.begin().await.unwrap();
println!("It hangs on the next line");
let rows_affected = repo.insert_credentials(&mut tx2, &credentials).await.unwrap();
assert_eq!(rows_affected, 1);
println!("It never reaches this line");
tx1.commit().await.unwrap();
tx2.commit().await.unwrap();
}
如何并行创建和执行这些 TX,以便在尝试提交第二个 TX 时断言通过但测试失败?
作为参考,这是我的 Cargo.toml
[package]
name = "auth"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
async-trait = "0.1.52"
serde = "1.0.136"
thiserror = "1.0.30"
# TODO https://github.com/SergioBenitez/Rocket/issues/1893#issuecomment-1002393878
rocket = { git = "https://github.com/SergioBenitez/Rocket", features = ["json"] }
[dependencies.redis]
version = "0.21.5"
features = ["tokio-comp"]
[dependencies.sqlx]
version = "0.5.11"
features = ["macros", "runtime-tokio-rustls", "postgres"]
[dependencies.uuid]
version = "1.0.0-alpha.1"
features = ["v4", "fast-rng", "macro-diagnostics"]
## DEV ##
[dev-dependencies]
mockall = "0.11.0"
[dev-dependencies.async-std]
version = "1.11.0"
features = ["attributes", "tokio1"]
您可以使用 async_std::future::timeout
or tokio::time::timeout
。使用 async_std 的示例:
use async_std::future;
use std::time::Duration;
let max_duration = Duration::from_millis(100);
assert!(timeout(max_duration, tx2.commit()).await.is_err());
如果你想在完成tx1
之前继续tx2
,你可以先async_std::task::spawn
or tokio::spawn
tx1:
async_std::task::spawn(async move {
assert!(tx1.commit().await.is_ok());
});
@Mika 为我指出了正确的方向,我可以生成两个事务并添加一些超时时间来为并发 TX 提供一些时间来执行。
let handle1 = tokio::spawn(async move {
let repo = new_repo();
let mut tx = db1.begin().await.unwrap();
let rows_affected = repo.insert_credentials(&mut tx, &credentials1).await.unwrap();
assert_eq!(rows_affected, 1);
tokio::time::sleep(Duration::from_millis(100)).await;
tx.commit().await.unwrap()
});
let handle2 = tokio::spawn(async move {
let repo = new_repo();
let mut tx = db2.begin().await.unwrap();
let rows_affected = repo.insert_credentials(&mut tx, &credentials2).await.unwrap();
assert_eq!(rows_affected, 1);
tokio::time::sleep(Duration::from_millis(100)).await;
tx.commit().await.unwrap()
});
let (_first, _second) = rocket::tokio::try_join!(handle1, handle2).unwrap();
我认为这样两个 TX 将并行执行直到睡眠线,然后一个提交,另一个在提交线上失败。但是不,实际上两个 TX 是并行执行的,TX1 一直运行到睡眠,TX2 在插入行上阻塞直到 TX1 提交,然后 TX2 在插入行上失败。
我想这就是 DB 在这种情况下的工作方式,也许我可以通过扰乱 TX 隔离来改变它,但这不是我的意图。我玩游戏是为了学习更多,今天的学习就够了:)