使用 Mutex 防止同时处理相同类型的多线程列表迭代
Multithreaded list iteration while using Mutex to prevent dealing with the same type at the same time
我正在编写一个需要在多个线程上同时 运行 的应用程序。它将处理一长串项目,其中每个项目 属性 是 user_id
。我试图确保属于同一 user_id
的项目永远不会同时处理。这意味着闭包 运行 子线程需要等待,直到没有其他线程正在为同一用户处理数据。
我不知道如何解决这个问题。我的简化当前示例如下所示:
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use threadpool::ThreadPool;
fn main() {
let pool = ThreadPool::new(num_cpus::get());
let mut locks: HashMap<String, Mutex<bool>> = HashMap::new();
let queue = Arc::new(vec![
"1".to_string(),
"1".to_string(),
"2".to_string(),
"1".to_string(),
"3".to_string(),
]);
let count = queue.len();
for i in 0..count {
let user_id = queue[i].clone();
// Problem: cannot borrow `locks` as mutable more than once at a time
// mutable borrow starts here in previous iteration of loop
let lock = locks.entry(user_id).or_insert(Mutex::new(true));
pool.execute(move || {
// Wait until the user_id becomes free.
lock.lock().unwrap();
// Do stuff with user_id, but never process
// the same user_id more than once at the same time.
println!("{:?}", user_id);
});
}
pool.join();
}
我正在尝试保留 Mutex
的列表,然后我用它来等待 user_id
变为空闲,但借用检查器不允许这样做。队列项目和项目处理代码在我正在处理的实际应用程序中要复杂得多。
不允许更改队列中项目的顺序(但由于等待锁定,允许进行一些更改)。
如何解决这种情况?
首先,HashMap::entry()
消耗了密钥,因此由于您也想在闭包中使用它,因此需要克隆它,即 .entry(user_id.clone())
.
由于您需要在主线程和工作线程之间共享 Mutex<bool>
,因此除非需要,否则您同样需要将其包装在 Arc
. You can also use Entry::or_insert_with()
, so you avoid needlessly creating a new Mutex
中。
let mut locks: HashMap<String, Arc<Mutex<bool>>> = HashMap::new();
// ...
let lock = locks
.entry(user_id.clone())
.or_insert_with(|| Arc::new(Mutex::new(true)))
.clone();
最后,必须存储lock()
返回的守卫,否则立即释放。
let _guard = lock.lock().unwrap();
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use threadpool::ThreadPool;
fn main() {
let pool = ThreadPool::new(num_cpus::get());
let mut locks: HashMap<String, Arc<Mutex<bool>>> = HashMap::new();
let queue = Arc::new(vec![
"1".to_string(),
"1".to_string(),
"2".to_string(),
"1".to_string(),
"3".to_string(),
]);
let count = queue.len();
for i in 0..count {
let user_id = queue[i].clone();
let lock = locks
.entry(user_id.clone())
.or_insert_with(|| Arc::new(Mutex::new(true)))
.clone();
pool.execute(move || {
// Wait until the user_id becomes free.
let _guard = lock.lock().unwrap();
// Do stuff with user_id, but never process
// the same user_id more than once at the same time.
println!("{:?}", user_id);
});
}
pool.join();
}
我正在编写一个需要在多个线程上同时 运行 的应用程序。它将处理一长串项目,其中每个项目 属性 是 user_id
。我试图确保属于同一 user_id
的项目永远不会同时处理。这意味着闭包 运行 子线程需要等待,直到没有其他线程正在为同一用户处理数据。
我不知道如何解决这个问题。我的简化当前示例如下所示:
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use threadpool::ThreadPool;
fn main() {
let pool = ThreadPool::new(num_cpus::get());
let mut locks: HashMap<String, Mutex<bool>> = HashMap::new();
let queue = Arc::new(vec![
"1".to_string(),
"1".to_string(),
"2".to_string(),
"1".to_string(),
"3".to_string(),
]);
let count = queue.len();
for i in 0..count {
let user_id = queue[i].clone();
// Problem: cannot borrow `locks` as mutable more than once at a time
// mutable borrow starts here in previous iteration of loop
let lock = locks.entry(user_id).or_insert(Mutex::new(true));
pool.execute(move || {
// Wait until the user_id becomes free.
lock.lock().unwrap();
// Do stuff with user_id, but never process
// the same user_id more than once at the same time.
println!("{:?}", user_id);
});
}
pool.join();
}
我正在尝试保留 Mutex
的列表,然后我用它来等待 user_id
变为空闲,但借用检查器不允许这样做。队列项目和项目处理代码在我正在处理的实际应用程序中要复杂得多。
不允许更改队列中项目的顺序(但由于等待锁定,允许进行一些更改)。
如何解决这种情况?
首先,HashMap::entry()
消耗了密钥,因此由于您也想在闭包中使用它,因此需要克隆它,即 .entry(user_id.clone())
.
由于您需要在主线程和工作线程之间共享 Mutex<bool>
,因此除非需要,否则您同样需要将其包装在 Arc
. You can also use Entry::or_insert_with()
, so you avoid needlessly creating a new Mutex
中。
let mut locks: HashMap<String, Arc<Mutex<bool>>> = HashMap::new();
// ...
let lock = locks
.entry(user_id.clone())
.or_insert_with(|| Arc::new(Mutex::new(true)))
.clone();
最后,必须存储lock()
返回的守卫,否则立即释放。
let _guard = lock.lock().unwrap();
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use threadpool::ThreadPool;
fn main() {
let pool = ThreadPool::new(num_cpus::get());
let mut locks: HashMap<String, Arc<Mutex<bool>>> = HashMap::new();
let queue = Arc::new(vec![
"1".to_string(),
"1".to_string(),
"2".to_string(),
"1".to_string(),
"3".to_string(),
]);
let count = queue.len();
for i in 0..count {
let user_id = queue[i].clone();
let lock = locks
.entry(user_id.clone())
.or_insert_with(|| Arc::new(Mutex::new(true)))
.clone();
pool.execute(move || {
// Wait until the user_id becomes free.
let _guard = lock.lock().unwrap();
// Do stuff with user_id, but never process
// the same user_id more than once at the same time.
println!("{:?}", user_id);
});
}
pool.join();
}