在异步 Rust 中处理重复插入数据库
Handling duplicate inserts into database in async rust
这里是 Rust 和异步编程的初学者。
我有一个功能可以下载一堆推文并将其存储在数据库中:
pub async fn process_user_timeline(config: &Settings, pool: &PgPool, user_object: &Value) {
// get timeline
if let Ok((user_timeline, _)) =
get_user_timeline(config, user_object["id"].as_str().unwrap()).await
{
// store tweets
if let Some(tweets) = user_timeline["data"].as_array() {
for tweet in tweets.iter() {
store_tweet(pool, &tweet, &user_timeline, "normal")
.await
.unwrap_or_else(|e| {
println!(
">>>X>>> failed to store tweet {}: {:?}",
tweet["id"].as_str().unwrap(),
e
)
});
}
}
}
}
另一个函数在异步循环中调用它:
pub async fn loop_until_hit_rate_limit<'a, T, Fut>(
object_arr: &'a [T],
settings: &'a Settings,
pool: &'a PgPool,
f: impl Fn(&'a Settings, &'a PgPool, &'a T) -> Fut + Copy,
rate_limit: usize,
) where
Fut: Future,
{
let total = object_arr.len();
let capped_total = min(total, rate_limit);
let mut futs = vec![];
for (i, object) in object_arr[..capped_total].iter().enumerate() {
futs.push(async move {
println!(">>> PROCESSING {}/{}", i + 1, total);
f(settings, pool, object).await;
});
}
futures::future::join_all(futs).await;
}
有时两个异步任务会尝试同时插入同一条推文,从而产生此错误:
failed to store tweet 1398307091442409475: Database(PgDatabaseError { severity: Error, code: "23505", message: "duplicate key value violates unique constraint \"tweets_tweet_id_key\"", detail: Some("Key (tweet_id)=(1398307091442409475) already exists."), hint: None, position: None, where: None, schema: Some("public"), table: Some("tweets"), column: None, data_type: None, constraint: Some("tweets_tweet_id_key"), file: Some("nbtinsert.c"), line: Some(656), routine: Some("_bt_check_unique") })
请注意,代码在插入推文之前已经检查推文是否存在,因此这仅在以下情况下发生:从任务 1 读取 > 从任务 2 读取 > 从任务 1 写入(成功)> 从任务写入2(错误)。
为了解决这个问题,到目前为止,我最好的尝试是放置一个 unwrap_or_else()
子句,让其中一个任务失败而不会在整个执行过程中出现恐慌。我知道至少有一个缺点——有时这两项任务都会失败,而推文永远不会被写入。它发生在 <1% 的情况下,但确实发生了。
我的方法还有其他我不知道的缺点吗?
处理这个问题的正确方法是什么?我讨厌丢失数据,更糟糕的是不确定地这样做。
PS 我正在使用 actix web
和 sqlx
作为我的网络服务器/数据库库。
通常对于任何可能由多个 threads/processes 编写的东西,任何类似
的逻辑
if (!exists) {
writeValue()
}
需要通过某种锁来保护,或者需要将代码更改为原子写入,写入可能会失败,因为已经有其他东西写入了它。
对于 Rust 中的内存数据,您将使用 Mutex
来确保您可以读取数据,然后在其他任何内容读取数据之前将数据写回,或者使用 Atomic
来修改数据这样一来,如果已经写了东西,您就可以检测到它。
在数据库中,对于任何可能与大约同时发生的其他查询发生冲突的查询,您需要在查询中使用 ON CONFLICT
子句,以便数据库本身知道何时该做什么它尝试写入数据并且它已经存在。
对于你的情况,因为我猜推文是不可变的,你可能想要做 ON CONFLICT tweet_id DO NOTHING
(或者你的 ID 列是什么),在这种情况下 INSERT
将跳过如果已经有一条带有您要插入的 ID 的推文,则插入,并且不会抛出错误。
这里是 Rust 和异步编程的初学者。
我有一个功能可以下载一堆推文并将其存储在数据库中:
pub async fn process_user_timeline(config: &Settings, pool: &PgPool, user_object: &Value) {
// get timeline
if let Ok((user_timeline, _)) =
get_user_timeline(config, user_object["id"].as_str().unwrap()).await
{
// store tweets
if let Some(tweets) = user_timeline["data"].as_array() {
for tweet in tweets.iter() {
store_tweet(pool, &tweet, &user_timeline, "normal")
.await
.unwrap_or_else(|e| {
println!(
">>>X>>> failed to store tweet {}: {:?}",
tweet["id"].as_str().unwrap(),
e
)
});
}
}
}
}
另一个函数在异步循环中调用它:
pub async fn loop_until_hit_rate_limit<'a, T, Fut>(
object_arr: &'a [T],
settings: &'a Settings,
pool: &'a PgPool,
f: impl Fn(&'a Settings, &'a PgPool, &'a T) -> Fut + Copy,
rate_limit: usize,
) where
Fut: Future,
{
let total = object_arr.len();
let capped_total = min(total, rate_limit);
let mut futs = vec![];
for (i, object) in object_arr[..capped_total].iter().enumerate() {
futs.push(async move {
println!(">>> PROCESSING {}/{}", i + 1, total);
f(settings, pool, object).await;
});
}
futures::future::join_all(futs).await;
}
有时两个异步任务会尝试同时插入同一条推文,从而产生此错误:
failed to store tweet 1398307091442409475: Database(PgDatabaseError { severity: Error, code: "23505", message: "duplicate key value violates unique constraint \"tweets_tweet_id_key\"", detail: Some("Key (tweet_id)=(1398307091442409475) already exists."), hint: None, position: None, where: None, schema: Some("public"), table: Some("tweets"), column: None, data_type: None, constraint: Some("tweets_tweet_id_key"), file: Some("nbtinsert.c"), line: Some(656), routine: Some("_bt_check_unique") })
请注意,代码在插入推文之前已经检查推文是否存在,因此这仅在以下情况下发生:从任务 1 读取 > 从任务 2 读取 > 从任务 1 写入(成功)> 从任务写入2(错误)。
为了解决这个问题,到目前为止,我最好的尝试是放置一个 unwrap_or_else()
子句,让其中一个任务失败而不会在整个执行过程中出现恐慌。我知道至少有一个缺点——有时这两项任务都会失败,而推文永远不会被写入。它发生在 <1% 的情况下,但确实发生了。
我的方法还有其他我不知道的缺点吗?
处理这个问题的正确方法是什么?我讨厌丢失数据,更糟糕的是不确定地这样做。
PS 我正在使用 actix web
和 sqlx
作为我的网络服务器/数据库库。
通常对于任何可能由多个 threads/processes 编写的东西,任何类似
的逻辑if (!exists) {
writeValue()
}
需要通过某种锁来保护,或者需要将代码更改为原子写入,写入可能会失败,因为已经有其他东西写入了它。
对于 Rust 中的内存数据,您将使用 Mutex
来确保您可以读取数据,然后在其他任何内容读取数据之前将数据写回,或者使用 Atomic
来修改数据这样一来,如果已经写了东西,您就可以检测到它。
在数据库中,对于任何可能与大约同时发生的其他查询发生冲突的查询,您需要在查询中使用 ON CONFLICT
子句,以便数据库本身知道何时该做什么它尝试写入数据并且它已经存在。
对于你的情况,因为我猜推文是不可变的,你可能想要做 ON CONFLICT tweet_id DO NOTHING
(或者你的 ID 列是什么),在这种情况下 INSERT
将跳过如果已经有一条带有您要插入的 ID 的推文,则插入,并且不会抛出错误。