Rust中多线程对非重叠非连续索引的无锁处理
Lockless processing of non overlapping non contiguous indexes by multiple threads in Rust
我正在练习 Rust 并决定创建一个 Matrix ops/factorization 项目。
基本上我希望能够在多个线程中处理底层向量。由于我将为每个线程提供非重叠索引(可能连续也可能不连续)并且线程将在创建它们的任何函数结束之前连接,因此不需要锁/同步。
我知道有几个 crate 可以做到这一点,但我想知道是否有一种相对惯用的 crate-free 方法可以自己实现。
我能想到的最好的办法是(稍微简化了代码):
use std::thread;
//This represents the Matrix
#[derive(Debug, Clone)]
pub struct MainStruct {
pub data: Vec<f64>,
}
//This is the bit that will be shared by the threads,
//ideally it should have its lifetime tied to that of MainStruct
//but i have no idea how to make phantomdata work in this case
#[derive(Debug, Clone)]
pub struct SliceTest {
pub data: Vec<SubSlice>,
}
//This struct is to hide *mut f64 to allow it to be shared to other threads
#[derive(Debug, Clone)]
pub struct SubSlice {
pub data: *mut f64,
}
impl MainStruct {
pub fn slice(&mut self) -> (SliceTest, SliceTest) {
let mut out_vec_odd: Vec<SubSlice> = Vec::new();
let mut out_vec_even: Vec<SubSlice> = Vec::new();
unsafe {
let ptr = self.data.as_mut_ptr();
for i in 0..self.data.len() {
let ptr_to_push = ptr.add(i);
//Non contiguous idxs
if i % 2 == 0 {
out_vec_even.push(SubSlice{data:ptr_to_push});
} else {
out_vec_odd.push(SubSlice{data:ptr_to_push});
}
}
}
(SliceTest{data: out_vec_even}, SliceTest{data: out_vec_odd})
}
}
impl SubSlice {
pub fn set(&self, val: f64) {
unsafe {*(self.data) = val;}
}
}
unsafe impl Send for SliceTest {}
unsafe impl Send for SubSlice {}
fn main() {
let mut maindata = MainStruct {
data: vec![0.0, 1.0, 2.0, 3.0, 4.0, 5.0],
};
let (mut outvec1, mut outvec2) = maindata.slice();
let mut threads = Vec::new();
threads.push(
thread::spawn(move || {
for i in 0..outvec1.data.len() {
outvec1.data[i].set(999.9);
}
})
);
threads.push(
thread::spawn(move || {
for i in 0..outvec2.data.len() {
outvec2.data[i].set(999.9);
}
})
);
for handles in threads {
handles.join();
}
println!("maindata = {:?}", maindata.data);
}
编辑:
按照下面的 kmdreko 建议,在不使用不安全代码的情况下让代码完全按照我想要的方式工作,耶!
当然,就性能而言,复制 f64 切片可能比创建可变引用向量更便宜,除非您的结构填充了其他结构而不是 f64
extern crate crossbeam;
use crossbeam::thread;
#[derive(Debug, Clone)]
pub struct Matrix {
data: Vec<f64>,
m: usize, //number of rows
n: usize, //number of cols
}
...
impl Matrix {
...
pub fn get_data_mut(&mut self) -> &mut Vec<f64> {
&mut self.data
}
pub fn calculate_idx(max_cols: usize, i: usize, j: usize) -> usize {
let actual_idx = j + max_cols * i;
actual_idx
}
//Get individual mutable references for contiguous indexes (rows)
pub fn get_all_row_slices(&mut self) -> Vec<Vec<&mut f64>> {
let max_cols = self.max_cols();
let max_rows = self.max_rows();
let inner_data = self.get_data_mut().chunks_mut(max_cols);
let mut out_vec: Vec<Vec<&mut f64>> = Vec::with_capacity(max_rows);
for chunk in inner_data {
let row_vec = chunk.iter_mut().collect();
out_vec.push(row_vec);
}
out_vec
}
//Get mutable references for disjoint indexes (columns)
pub fn get_all_col_slices(&mut self) -> Vec<Vec<&mut f64>> {
let max_cols = self.max_cols();
let max_rows = self.max_rows();
let inner_data = self.get_data_mut().chunks_mut(max_cols);
let mut out_vec: Vec<Vec<&mut f64>> = Vec::with_capacity(max_cols);
for _ in 0..max_cols {
out_vec.push(Vec::with_capacity(max_rows));
}
let mut inner_idx = 0;
for chunk in inner_data {
let row_vec_it = chunk.iter_mut();
for elem in row_vec_it {
out_vec[inner_idx].push(elem);
inner_idx += 1;
}
inner_idx = 0;
}
out_vec
}
...
}
fn test_multithreading() {
fn test(in_vec: Vec<&mut f64>) {
for elem in in_vec {
*elem = 33.3;
}
}
fn launch_task(mat: &mut Matrix, f: fn(Vec<&mut f64>)) {
let test_vec = mat.get_all_row_slices();
thread::scope(|s| {
for elem in test_vec.into_iter() {
s.spawn(move |_| {
println!("Spawning thread...");
f(elem);
});
}
}).unwrap();
}
let rows = 4;
let cols = 3;
//new function code omitted, returns Result<Self, MatrixError>
let mut mat = Matrix::new(rows, cols).unwrap()
launch_task(&mut mat, test);
for i in 0..rows {
for j in 0..cols {
//Requires index trait implemented for matrix
assert_eq!(mat[(i, j)], 33.3);
}
}
}
这个API不合理。由于没有生命周期注释绑定 SliceTest
和 SubSlice
到 MainStruct
,它们可以在数据被销毁后保留,如果使用会导致释放后使用错误。
虽然很容易让它变得安全;您可以使用 .iter_mut()
获取对元素的不同可变引用:
pub fn slice(&mut self) -> (Vec<&mut f64>, Vec<&mut f64>) {
let mut out_vec_even = vec![];
let mut out_vec_odd = vec![];
for (i, item_ref) in self.data.iter_mut().enumerate() {
if i % 2 == 0 {
out_vec_even.push(item_ref);
} else {
out_vec_odd.push(item_ref);
}
}
(out_vec_even, out_vec_odd)
}
然而,这又暴露出另一个问题:thread::spawn
无法保存对局部变量的引用。创建的线程可以在创建它们的范围之外继续存在,所以即使你做了 .join()
它们,你也不需要这样做。这也是您原始代码中的一个潜在问题,只是编译器无法对此发出警告。
没有简单的方法可以解决这个问题。您需要使用非引用方式在其他线程上使用数据,但那将使用 Arc
,它不允许改变其数据,因此您必须求助于 Mutex
,这是你试图避免的。
我建议使用 scope
from the crossbeam
crate,它允许您生成引用本地数据的线程。我知道你想避免使用板条箱,但我认为这是最好的解决方案。
在 playground 上查看工作版本。
参见:
我正在练习 Rust 并决定创建一个 Matrix ops/factorization 项目。
基本上我希望能够在多个线程中处理底层向量。由于我将为每个线程提供非重叠索引(可能连续也可能不连续)并且线程将在创建它们的任何函数结束之前连接,因此不需要锁/同步。
我知道有几个 crate 可以做到这一点,但我想知道是否有一种相对惯用的 crate-free 方法可以自己实现。
我能想到的最好的办法是(稍微简化了代码):
use std::thread;
//This represents the Matrix
#[derive(Debug, Clone)]
pub struct MainStruct {
pub data: Vec<f64>,
}
//This is the bit that will be shared by the threads,
//ideally it should have its lifetime tied to that of MainStruct
//but i have no idea how to make phantomdata work in this case
#[derive(Debug, Clone)]
pub struct SliceTest {
pub data: Vec<SubSlice>,
}
//This struct is to hide *mut f64 to allow it to be shared to other threads
#[derive(Debug, Clone)]
pub struct SubSlice {
pub data: *mut f64,
}
impl MainStruct {
pub fn slice(&mut self) -> (SliceTest, SliceTest) {
let mut out_vec_odd: Vec<SubSlice> = Vec::new();
let mut out_vec_even: Vec<SubSlice> = Vec::new();
unsafe {
let ptr = self.data.as_mut_ptr();
for i in 0..self.data.len() {
let ptr_to_push = ptr.add(i);
//Non contiguous idxs
if i % 2 == 0 {
out_vec_even.push(SubSlice{data:ptr_to_push});
} else {
out_vec_odd.push(SubSlice{data:ptr_to_push});
}
}
}
(SliceTest{data: out_vec_even}, SliceTest{data: out_vec_odd})
}
}
impl SubSlice {
pub fn set(&self, val: f64) {
unsafe {*(self.data) = val;}
}
}
unsafe impl Send for SliceTest {}
unsafe impl Send for SubSlice {}
fn main() {
let mut maindata = MainStruct {
data: vec![0.0, 1.0, 2.0, 3.0, 4.0, 5.0],
};
let (mut outvec1, mut outvec2) = maindata.slice();
let mut threads = Vec::new();
threads.push(
thread::spawn(move || {
for i in 0..outvec1.data.len() {
outvec1.data[i].set(999.9);
}
})
);
threads.push(
thread::spawn(move || {
for i in 0..outvec2.data.len() {
outvec2.data[i].set(999.9);
}
})
);
for handles in threads {
handles.join();
}
println!("maindata = {:?}", maindata.data);
}
编辑: 按照下面的 kmdreko 建议,在不使用不安全代码的情况下让代码完全按照我想要的方式工作,耶!
当然,就性能而言,复制 f64 切片可能比创建可变引用向量更便宜,除非您的结构填充了其他结构而不是 f64
extern crate crossbeam;
use crossbeam::thread;
#[derive(Debug, Clone)]
pub struct Matrix {
data: Vec<f64>,
m: usize, //number of rows
n: usize, //number of cols
}
...
impl Matrix {
...
pub fn get_data_mut(&mut self) -> &mut Vec<f64> {
&mut self.data
}
pub fn calculate_idx(max_cols: usize, i: usize, j: usize) -> usize {
let actual_idx = j + max_cols * i;
actual_idx
}
//Get individual mutable references for contiguous indexes (rows)
pub fn get_all_row_slices(&mut self) -> Vec<Vec<&mut f64>> {
let max_cols = self.max_cols();
let max_rows = self.max_rows();
let inner_data = self.get_data_mut().chunks_mut(max_cols);
let mut out_vec: Vec<Vec<&mut f64>> = Vec::with_capacity(max_rows);
for chunk in inner_data {
let row_vec = chunk.iter_mut().collect();
out_vec.push(row_vec);
}
out_vec
}
//Get mutable references for disjoint indexes (columns)
pub fn get_all_col_slices(&mut self) -> Vec<Vec<&mut f64>> {
let max_cols = self.max_cols();
let max_rows = self.max_rows();
let inner_data = self.get_data_mut().chunks_mut(max_cols);
let mut out_vec: Vec<Vec<&mut f64>> = Vec::with_capacity(max_cols);
for _ in 0..max_cols {
out_vec.push(Vec::with_capacity(max_rows));
}
let mut inner_idx = 0;
for chunk in inner_data {
let row_vec_it = chunk.iter_mut();
for elem in row_vec_it {
out_vec[inner_idx].push(elem);
inner_idx += 1;
}
inner_idx = 0;
}
out_vec
}
...
}
fn test_multithreading() {
fn test(in_vec: Vec<&mut f64>) {
for elem in in_vec {
*elem = 33.3;
}
}
fn launch_task(mat: &mut Matrix, f: fn(Vec<&mut f64>)) {
let test_vec = mat.get_all_row_slices();
thread::scope(|s| {
for elem in test_vec.into_iter() {
s.spawn(move |_| {
println!("Spawning thread...");
f(elem);
});
}
}).unwrap();
}
let rows = 4;
let cols = 3;
//new function code omitted, returns Result<Self, MatrixError>
let mut mat = Matrix::new(rows, cols).unwrap()
launch_task(&mut mat, test);
for i in 0..rows {
for j in 0..cols {
//Requires index trait implemented for matrix
assert_eq!(mat[(i, j)], 33.3);
}
}
}
这个API不合理。由于没有生命周期注释绑定 SliceTest
和 SubSlice
到 MainStruct
,它们可以在数据被销毁后保留,如果使用会导致释放后使用错误。
虽然很容易让它变得安全;您可以使用 .iter_mut()
获取对元素的不同可变引用:
pub fn slice(&mut self) -> (Vec<&mut f64>, Vec<&mut f64>) {
let mut out_vec_even = vec![];
let mut out_vec_odd = vec![];
for (i, item_ref) in self.data.iter_mut().enumerate() {
if i % 2 == 0 {
out_vec_even.push(item_ref);
} else {
out_vec_odd.push(item_ref);
}
}
(out_vec_even, out_vec_odd)
}
然而,这又暴露出另一个问题:thread::spawn
无法保存对局部变量的引用。创建的线程可以在创建它们的范围之外继续存在,所以即使你做了 .join()
它们,你也不需要这样做。这也是您原始代码中的一个潜在问题,只是编译器无法对此发出警告。
没有简单的方法可以解决这个问题。您需要使用非引用方式在其他线程上使用数据,但那将使用 Arc
,它不允许改变其数据,因此您必须求助于 Mutex
,这是你试图避免的。
我建议使用 scope
from the crossbeam
crate,它允许您生成引用本地数据的线程。我知道你想避免使用板条箱,但我认为这是最好的解决方案。
在 playground 上查看工作版本。
参见: