Rayon 中的每线程初始化
Per-thread initialization in Rayon
我正在尝试使用 Rayon 的 par_iter()
来优化我的功能。
单线程版本是这样的:
fn verify_and_store(store: &mut Store, txs: Vec<Tx>) {
let result = txs.iter().map(|tx| {
tx.verify_and_store(store)
}).collect();
...
}
每个 Store
实例只能由一个线程使用,但是 Store
的多个实例可以同时使用,所以我可以通过 clone
-ing store
:
fn verify_and_store(store: &mut Store, txs: Vec<Tx>) {
let result = txs.par_iter().map(|tx| {
let mut local_store = store.clone();
tx.verify_and_store(&mut local_store)
}).collect();
...
}
但是,这会在 每次 迭代时克隆 store
,这太慢了。我想每个线程使用一个商店实例。
Rayon 可以吗?或者我应该求助于手动线程和工作队列?
可以使用线程局部变量来确保 local_store
不会在给定线程中多次创建。
例如,编译 (full source):
fn verify_and_store(store: &mut Store, txs: Vec<Tx>) {
use std::cell::RefCell;
thread_local!(static STORE: RefCell<Option<Store>> = RefCell::new(None));
let mut result = Vec::new();
txs.par_iter().map(|tx| {
STORE.with(|cell| {
let mut local_store = cell.borrow_mut();
if local_store.is_none() {
*local_store = Some(store.clone());
}
tx.verify_and_store(local_store.as_mut().unwrap())
})
}).collect_into(&mut result);
}
但是,这段代码有两个问题。第一,如果 store
的克隆需要在 par_iter()
完成时做一些事情,比如刷新它们的缓冲区,这根本不会发生——它们的 Drop
只会在 Rayon 的 worker 时被调用线程退出,甚至 is not guaranteed.
第二个也是更严重的问题是,store
的克隆只为每个工作线程创建一次。如果 Rayon 缓存了它的线程池(我相信它确实缓存了),这意味着稍后对 verify_and_store
的不相关调用将继续使用 store
的最后已知克隆,这可能与当前无关商店。
这可以通过稍微复杂化代码来纠正:
将克隆的变量存储在 Mutex<Option<...>>
而不是 Option
中,以便调用 par_iter()
的线程可以访问它们。这将在每次访问时产生互斥锁,但该锁将是无竞争的,因此成本低。
在互斥量周围使用 Arc
以收集对向量中创建的存储克隆的引用。此向量用于通过在迭代完成后将它们重置为 None
来清理存储。
将整个调用包装在一个不相关的互斥体中,这样对 verify_and_store
的两个并行调用就不会看到彼此的存储克隆。 (如果在迭代之前创建并安装了一个新的线程池,这可能是可以避免的。)希望这种序列化不会影响 verify_and_store
的性能,因为每次调用都会使用整个线程池。
结果不尽人意,但它可以编译,仅使用安全代码,而且似乎可以工作:
fn verify_and_store(store: &mut Store, txs: Vec<Tx>) {
use std::sync::{Arc, Mutex};
type SharedStore = Arc<Mutex<Option<Store>>>;
lazy_static! {
static ref STORE_CLONES: Mutex<Vec<SharedStore>> = Mutex::new(Vec::new());
static ref NO_REENTRY: Mutex<()> = Mutex::new(());
}
thread_local!(static STORE: SharedStore = Arc::new(Mutex::new(None)));
let mut result = Vec::new();
let _no_reentry = NO_REENTRY.lock();
txs.par_iter().map({
|tx| {
STORE.with(|arc_mtx| {
let mut local_store = arc_mtx.lock().unwrap();
if local_store.is_none() {
*local_store = Some(store.clone());
STORE_CLONES.lock().unwrap().push(arc_mtx.clone());
}
tx.verify_and_store(local_store.as_mut().unwrap())
})
}
}).collect_into(&mut result);
let mut store_clones = STORE_CLONES.lock().unwrap();
for store in store_clones.drain(..) {
store.lock().unwrap().take();
}
}
老问题,但我觉得答案需要重新审视。一般来说,有两种方法:
使用map_with
。每当一个线程从另一个线程窃取一个工作项时,这将克隆。这可能会克隆比线程更多的商店,但它应该相当低。如果克隆太昂贵,您可以增加 rayon 的大小,将使用 with_min_len
.
拆分工作负载
fn verify_and_store(store: &mut Store, txs: Vec<Tx>) {
let result = txs.iter().map_with(|| store.clone(), |store, tx| {
tx.verify_and_store(store)
}).collect();
...
}
或者使用 thread_local 包中的作用域 ThreadLocal
。这将确保您只使用与线程一样多的对象,并且一旦 ThreadLocal
对象超出范围,它们就会被销毁。
fn verify_and_store(store: &mut Store, txs: Vec<Tx>) {
let tl = ThreadLocal::new();
let result = txs.iter().map(|tx| {
let store = tl.get_or(|| Box::new(RefCell::new(store.clone)));
tx.verify_and_store(store.get_mut());
}).collect();
...
}
我正在尝试使用 Rayon 的 par_iter()
来优化我的功能。
单线程版本是这样的:
fn verify_and_store(store: &mut Store, txs: Vec<Tx>) {
let result = txs.iter().map(|tx| {
tx.verify_and_store(store)
}).collect();
...
}
每个 Store
实例只能由一个线程使用,但是 Store
的多个实例可以同时使用,所以我可以通过 clone
-ing store
:
fn verify_and_store(store: &mut Store, txs: Vec<Tx>) {
let result = txs.par_iter().map(|tx| {
let mut local_store = store.clone();
tx.verify_and_store(&mut local_store)
}).collect();
...
}
但是,这会在 每次 迭代时克隆 store
,这太慢了。我想每个线程使用一个商店实例。
Rayon 可以吗?或者我应该求助于手动线程和工作队列?
可以使用线程局部变量来确保 local_store
不会在给定线程中多次创建。
例如,编译 (full source):
fn verify_and_store(store: &mut Store, txs: Vec<Tx>) {
use std::cell::RefCell;
thread_local!(static STORE: RefCell<Option<Store>> = RefCell::new(None));
let mut result = Vec::new();
txs.par_iter().map(|tx| {
STORE.with(|cell| {
let mut local_store = cell.borrow_mut();
if local_store.is_none() {
*local_store = Some(store.clone());
}
tx.verify_and_store(local_store.as_mut().unwrap())
})
}).collect_into(&mut result);
}
但是,这段代码有两个问题。第一,如果 store
的克隆需要在 par_iter()
完成时做一些事情,比如刷新它们的缓冲区,这根本不会发生——它们的 Drop
只会在 Rayon 的 worker 时被调用线程退出,甚至 is not guaranteed.
第二个也是更严重的问题是,store
的克隆只为每个工作线程创建一次。如果 Rayon 缓存了它的线程池(我相信它确实缓存了),这意味着稍后对 verify_and_store
的不相关调用将继续使用 store
的最后已知克隆,这可能与当前无关商店。
这可以通过稍微复杂化代码来纠正:
将克隆的变量存储在
Mutex<Option<...>>
而不是Option
中,以便调用par_iter()
的线程可以访问它们。这将在每次访问时产生互斥锁,但该锁将是无竞争的,因此成本低。在互斥量周围使用
Arc
以收集对向量中创建的存储克隆的引用。此向量用于通过在迭代完成后将它们重置为None
来清理存储。将整个调用包装在一个不相关的互斥体中,这样对
verify_and_store
的两个并行调用就不会看到彼此的存储克隆。 (如果在迭代之前创建并安装了一个新的线程池,这可能是可以避免的。)希望这种序列化不会影响verify_and_store
的性能,因为每次调用都会使用整个线程池。
结果不尽人意,但它可以编译,仅使用安全代码,而且似乎可以工作:
fn verify_and_store(store: &mut Store, txs: Vec<Tx>) {
use std::sync::{Arc, Mutex};
type SharedStore = Arc<Mutex<Option<Store>>>;
lazy_static! {
static ref STORE_CLONES: Mutex<Vec<SharedStore>> = Mutex::new(Vec::new());
static ref NO_REENTRY: Mutex<()> = Mutex::new(());
}
thread_local!(static STORE: SharedStore = Arc::new(Mutex::new(None)));
let mut result = Vec::new();
let _no_reentry = NO_REENTRY.lock();
txs.par_iter().map({
|tx| {
STORE.with(|arc_mtx| {
let mut local_store = arc_mtx.lock().unwrap();
if local_store.is_none() {
*local_store = Some(store.clone());
STORE_CLONES.lock().unwrap().push(arc_mtx.clone());
}
tx.verify_and_store(local_store.as_mut().unwrap())
})
}
}).collect_into(&mut result);
let mut store_clones = STORE_CLONES.lock().unwrap();
for store in store_clones.drain(..) {
store.lock().unwrap().take();
}
}
老问题,但我觉得答案需要重新审视。一般来说,有两种方法:
使用map_with
。每当一个线程从另一个线程窃取一个工作项时,这将克隆。这可能会克隆比线程更多的商店,但它应该相当低。如果克隆太昂贵,您可以增加 rayon 的大小,将使用 with_min_len
.
fn verify_and_store(store: &mut Store, txs: Vec<Tx>) {
let result = txs.iter().map_with(|| store.clone(), |store, tx| {
tx.verify_and_store(store)
}).collect();
...
}
或者使用 thread_local 包中的作用域 ThreadLocal
。这将确保您只使用与线程一样多的对象,并且一旦 ThreadLocal
对象超出范围,它们就会被销毁。
fn verify_and_store(store: &mut Store, txs: Vec<Tx>) {
let tl = ThreadLocal::new();
let result = txs.iter().map(|tx| {
let store = tl.get_or(|| Box::new(RefCell::new(store.clone)));
tx.verify_and_store(store.get_mut());
}).collect();
...
}