Rust中多线程对非重叠非连续索引的无锁处理

Lockless processing of non overlapping non contiguous indexes by multiple threads in Rust

我正在练习 Rust 并决定创建一个 Matrix ops/factorization 项目。

基本上我希望能够在多个线程中处理底层向量。由于我将为每个线程提供非重叠索引(可能连续也可能不连续)并且线程将在创建它们的任何函数结束之前连接,因此不需要锁/同步。

我知道有几个 crate 可以做到这一点,但我想知道是否有一种相对惯用的 crate-free 方法可以自己实现。

我能想到的最好的办法是(稍微简化了代码):

use std::thread;

//This represents the Matrix
#[derive(Debug, Clone)]
pub struct MainStruct {
    pub data: Vec<f64>,
}
//This is the bit that will be shared by the threads, 
//ideally it should have its lifetime tied to that of MainStruct
//but i have no idea how to make phantomdata work in this case
#[derive(Debug, Clone)]
pub struct SliceTest {
    pub data: Vec<SubSlice>,
}
//This struct is to hide *mut f64 to allow it to be shared to other threads
#[derive(Debug, Clone)]
pub struct SubSlice {
    pub data: *mut f64,
}

impl MainStruct {
    pub fn slice(&mut self) -> (SliceTest, SliceTest) {
        let mut out_vec_odd: Vec<SubSlice> = Vec::new();

        let mut out_vec_even: Vec<SubSlice> = Vec::new();

        unsafe {
            let ptr = self.data.as_mut_ptr();

            for i in 0..self.data.len() {
                let ptr_to_push = ptr.add(i);
                //Non contiguous idxs
                if i % 2 == 0 {
                    out_vec_even.push(SubSlice{data:ptr_to_push});
                } else {
                    out_vec_odd.push(SubSlice{data:ptr_to_push});
                }
            }
        }

        (SliceTest{data: out_vec_even}, SliceTest{data: out_vec_odd})
    }
}

impl SubSlice {
    pub fn set(&self, val: f64) {
        unsafe {*(self.data) = val;}
    }
}
unsafe impl Send for SliceTest {}
unsafe impl Send for SubSlice {}

fn main() {
    let mut maindata = MainStruct {
        data: vec![0.0, 1.0, 2.0, 3.0, 4.0, 5.0],
    };

    let (mut outvec1, mut outvec2) = maindata.slice();
    let mut threads = Vec::new();

    threads.push(
        thread::spawn(move || {
            for i in 0..outvec1.data.len() {
                outvec1.data[i].set(999.9);
            }
        })
    );
    threads.push(
        thread::spawn(move || {
            for i in 0..outvec2.data.len() {
                outvec2.data[i].set(999.9);
            }
        })
    );

    for handles in threads {
        handles.join();
    }

    println!("maindata = {:?}", maindata.data);
}

编辑: 按照下面的 kmdreko 建议,在不使用不安全代码的情况下让代码完全按照我想要的方式工作,耶!

当然,就性能而言,复制 f64 切片可能比创建可变引用向量更便宜,除非您的结构填充了其他结构而不是 f64

extern crate crossbeam;
use crossbeam::thread;

#[derive(Debug, Clone)]
pub struct Matrix {
    data: Vec<f64>,
    m: usize, //number of rows
    n: usize, //number of cols
}

...

impl Matrix {
    ...
    pub fn get_data_mut(&mut self) -> &mut Vec<f64> {
        &mut self.data
    }

    pub fn calculate_idx(max_cols: usize, i: usize, j: usize) -> usize {
        let actual_idx = j + max_cols * i;
        actual_idx
    }
    //Get individual mutable references for contiguous indexes (rows)
    pub fn get_all_row_slices(&mut self) -> Vec<Vec<&mut f64>> {
        let max_cols = self.max_cols();
        let max_rows = self.max_rows();
        let inner_data = self.get_data_mut().chunks_mut(max_cols);
        let mut out_vec: Vec<Vec<&mut f64>> = Vec::with_capacity(max_rows);

        for chunk in inner_data {
            let row_vec = chunk.iter_mut().collect();
            out_vec.push(row_vec);
        }

        out_vec
    }
    //Get mutable references for disjoint indexes (columns)
    pub fn get_all_col_slices(&mut self) -> Vec<Vec<&mut f64>> {
        let max_cols = self.max_cols();
        let max_rows = self.max_rows();
        let inner_data = self.get_data_mut().chunks_mut(max_cols);
        let mut out_vec: Vec<Vec<&mut f64>> = Vec::with_capacity(max_cols);

        for _ in 0..max_cols {
            out_vec.push(Vec::with_capacity(max_rows));
        }

        let mut inner_idx = 0;

        for chunk in inner_data {
            let row_vec_it = chunk.iter_mut();

            for elem in row_vec_it {
                out_vec[inner_idx].push(elem);
                inner_idx += 1;
            }

            inner_idx = 0;
        }

        out_vec
    }
    ...
}

fn test_multithreading() {
    fn test(in_vec: Vec<&mut f64>) {
        for elem in in_vec {
            *elem = 33.3;
        }
    }

    fn launch_task(mat: &mut Matrix, f: fn(Vec<&mut f64>)) {

        let test_vec = mat.get_all_row_slices();
        thread::scope(|s| {
            for elem in test_vec.into_iter() {
                s.spawn(move |_| {
                        println!("Spawning thread...");
                        f(elem);
                    });
            }
        }).unwrap();
    }

    let rows = 4;
    let cols = 3;
    //new function code omitted, returns Result<Self, MatrixError>
    let mut mat = Matrix::new(rows, cols).unwrap()

    launch_task(&mut mat, test);

    for i in 0..rows {
        for j in 0..cols {
            //Requires index trait implemented for matrix
            assert_eq!(mat[(i, j)], 33.3);
        }
    }
}

这个API不合理。由于没有生命周期注释绑定 SliceTestSubSliceMainStruct,它们可以在数据被销毁后保留,如果使用会导致释放后使用错误。

虽然很容易让它变得安全;您可以使用 .iter_mut() 获取对元素的不同可变引用:

pub fn slice(&mut self) -> (Vec<&mut f64>, Vec<&mut f64>) {
    let mut out_vec_even = vec![];
    let mut out_vec_odd = vec![];
    
    for (i, item_ref) in self.data.iter_mut().enumerate() {
        if i % 2 == 0 {
            out_vec_even.push(item_ref);
        } else {
            out_vec_odd.push(item_ref);
        }
    }

    (out_vec_even, out_vec_odd)
}

然而,这又暴露出另一个问题:thread::spawn 无法保存对局部变量的引用。创建的线程可以在创建它们的范围之外继续存在,所以即使你做了 .join() 它们,你也不需要这样做。这也是您原始代码中的一个潜在问题,只是编译器无法对此发出警告。

没有简单的方法可以解决这个问题。您需要使用非引用方式在其他线程上使用数据,但那将使用 Arc,它不允许改变其数据,因此您必须求助于 Mutex,这是你试图避免的。

我建议使用 scope from the crossbeam crate,它允许您生成引用本地数据的线程。我知道你想避免使用板条箱,但我认为这是最好的解决方案。

playground 上查看工作版本。

参见: