如何并行化这个循环?

How to parallelize this loop?

我 运行 与借用检查器撞墙了,我想是因为它无法判断在主线程加入后工作人员不再使用的东西。我已将我的问题简化为相同模式的最简单示例。我试图避免不必要的分配和复制,尤其是 b.

fn sequential_fn() -> Vec<i32>
{
    let mut a = vec![1,2,3,4]; //two mutable arrays
    let mut b = vec![0;4];

    for repetitions in 0..100
    {
        for i in 0..4 
        {
            b[i] ^= a[i]; //example heavy operation - a isnt modified but b is
        }
        b.sort();       //heavy operation over, b is sorted
        for e in a.iter_mut() //a is modified by main thread before repeating process 
            { *e += 1;}
    }
    return b; //return b 
}

上面的顺序版本可以编译并且工作正常。这就是我试图并行化繁重的操作以拆分工作负载的方式:

fn parallel_fn() -> Vec<i32>
{
    let mut a = vec![1,2,3,4];
    let mut b = vec![0;4];
    let (mut b1 , mut b2) = b.split_at_mut(2); //split b into slices for each worker (err1)

    for repetitions in 0..100
    {
        //only 1 worker for example purposes
        let worker = std::thread::spawn(||  //(err2)
        {
            for i in 2..4 
            {
                b2[i-2] ^= a[i] //mutably borrow b2, immutably borrow a
            }
        });
        for i in 0..2
        {
            b1[i] ^= a[i]; // mutably borrow b1, immutably borrow a 
        }
        worker.join(); //workers finish
        b.sort(); //borrow b as mutable in main thread only (err3)
        for e in a.iter_mut() //borrow a as mutable in main thread only (err4)
            { *e += 1;}
    }
    return b;
}

我遇到的错误是:

error[E0597]: `b` does not live long enough
  --> src/lib.rs:5:29
   |
5  |     let (mut b1 , mut b2) = b.split_at_mut(2); //split b into slices for each worker (err1)
   |                             ^^^^^^^^^^^^^^^^^
   |                             |
   |                             borrowed value does not live long enough
   |                             argument requires that `b` is borrowed for `'static`
...
27 | }
   | - `b` dropped here while still borrowed

error[E0499]: cannot borrow `*b2` as mutable more than once at a time
  --> src/lib.rs:10:41
   |
10 |           let worker = std::thread::spawn(||  //(err2)
   |                        -                  ^^ `*b2` was mutably borrowed here in the previous iteration of the loop
   |  ______________________|
   | |
11 | |         {
12 | |             for i in 2..4 
13 | |             {
14 | |                 b2[i-2] ^= a[i] //mutably borrow b2, immutably borrow a
   | |                 -- borrows occur due to use of `*b2` in closure
15 | |             }
16 | |         });
   | |__________- argument requires that `*b2` is borrowed for `'static`

error[E0373]: closure may outlive the current function, but it borrows `a`, which is owned by the current function
  --> src/lib.rs:10:41
   |
10 |         let worker = std::thread::spawn(||  //(err2)
   |                                         ^^ may outlive borrowed value `a`
...
14 |                 b2[i-2] ^= a[i] //mutably borrow b2, immutably borrow a
   |                            - `a` is borrowed here
   |
note: function requires argument type to outlive `'static`
  --> src/lib.rs:10:22
   |
10 |           let worker = std::thread::spawn(||  //(err2)
   |  ______________________^
11 | |         {
12 | |             for i in 2..4 
13 | |             {
14 | |                 b2[i-2] ^= a[i] //mutably borrow b2, immutably borrow a
15 | |             }
16 | |         });
   | |__________^
help: to force the closure to take ownership of `a` (and any other referenced variables), use the `move` keyword
   |
10 |         let worker = std::thread::spawn(move ||  //(err2)
   |                                         ++++

error[E0499]: cannot borrow `b` as mutable more than once at a time
  --> src/lib.rs:22:9
   |
5  |     let (mut b1 , mut b2) = b.split_at_mut(2); //split b into slices for each worker (err1)
   |                             ----------------- first mutable borrow occurs here
...
19 |             b1[i] ^= a[i]; // mutably borrow b1, immutably borrow a 
   |             ----- first borrow later used here
...
22 |         b.sort(); //borrow b as mutable in main thread only (err3)
   |         ^^^^^^^^ second mutable borrow occurs here

error[E0502]: cannot borrow `a` as mutable because it is also borrowed as immutable
  --> src/lib.rs:23:18
   |
10 |           let worker = std::thread::spawn(||  //(err2)
   |                        -                  -- immutable borrow occurs here
   |  ______________________|
   | |
11 | |         {
12 | |             for i in 2..4 
13 | |             {
14 | |                 b2[i-2] ^= a[i] //mutably borrow b2, immutably borrow a
   | |                            - first borrow occurs due to use of `a` in closure
15 | |             }
16 | |         });
   | |__________- argument requires that `a` is borrowed for `'static`
...
23 |           for e in a.iter_mut() //borrow a as mutable in main thread only (err4)
   |                    ^^^^^^^^^^^^ mutable borrow occurs here

error[E0505]: cannot move out of `b` because it is borrowed
  --> src/lib.rs:26:12
   |
5  |     let (mut b1 , mut b2) = b.split_at_mut(2); //split b into slices for each worker (err1)
   |                             -----------------
   |                             |
   |                             borrow of `b` occurs here
   |                             argument requires that `b` is borrowed for `'static`
...
26 |     return b;
   |            ^ move out of `b` occurs here

Playground

你在代码中做了一堆假设,rust borrow checker 无法在编译类型中确保这些假设:

  • parallel_fn 完成时,ab 将被删除。您正在调用 .join() 以确保生成的线程应该在那个时候完成,但这是一个运行时检查,rust 无法在编译时检查它。 这就是为什么你不能传递对 spawn 的引用,所以你需要 arc
  • 您正在调用 .join() 以确保在您修改它时不会发生对 a 的读取操作,但编译器无法检查它。
  • 你在不同的线程中改变了同一个向量的部分,除非你格外小心,否则这是不安全的,所以你需要一些不安全的代码。替代方法是将向量拆分为 2 个向量(不是切片)并加入它们,但这是额外的复制。

我得到了这样的结果:

use std::sync::{Mutex, Arc, RwLock};

#[derive(Debug)]
struct VectorHolder {
    vec: Vec<i32>,
    slice1_ptr: Mutex<*mut [i32]>,
    slice2_ptr: Mutex<*mut [i32]>,
}

impl VectorHolder {

    pub fn new(mut vec: Vec<i32>) -> Self {
        let (slice1 , slice2) = vec.split_at_mut(2);
        let slice1_ptr = slice1 as *mut _;
        let slice2_ptr = slice2 as *mut _;
        Self {
            vec,
            slice1_ptr: Mutex::new(slice1_ptr),
            slice2_ptr: Mutex::new(slice2_ptr),
        }
    }

    /// Below operations are "safe" as no one else can access parts of Vec until
    /// VectorHolder is consumed.
    /// It is also safe we can't call op1 from 2 threads due to the mutex.
    /// Mutex could optionally be removed if you are sure you never call op1 and op2 concurrently.
    /// In this case we are sure about that as we have .join
    pub fn op1(&self, a: &[i32]) {
        let mut guard = self.slice2_ptr.lock().unwrap();
        let b2 = unsafe { &mut **guard };
        for i in 2..4 {
            b2[i-2] ^= a[i] //mutably borrow b2, immutably borrow a
        }
    }

    pub fn op2(&self, a: &[i32]) {
        let mut guard = self.slice1_ptr.lock().unwrap();
        let b1 = unsafe { &mut **guard };
        for i in 0..2 {
            b1[i] ^= a[i]; // mutably borrow b1, immutably borrow a
        }
    }

    pub fn consume(self) -> Vec<i32> {
        self.vec
    }
}

unsafe impl Send for VectorHolder { }
unsafe impl Sync for VectorHolder { }

pub fn operations_on_b(b: Vec<i32>, a: Arc<RwLock<Vec<i32>>>) -> Vec<i32> {
    let holder = Arc::new(VectorHolder::new(b));

    //only 1 worker for example purposes
    let holder_clone = holder.clone();
    let a_clone = a.clone();
    let worker = std::thread::spawn(move || {
        holder_clone.op1(a_clone.read().unwrap().as_slice());
    });
    holder.op2(a.read().unwrap().as_slice());
    worker.join().unwrap(); //workers finish

    let mut modified_b = Arc::try_unwrap(holder).unwrap().consume();

    modified_b.sort();
    modified_b
}

fn parallel_fn() -> Vec<i32>
{
    let a = Arc::new(RwLock::new(vec![1,2,3,4]));
    let mut b = vec![0;4];

    for _repetitions in 0..100
    {
        b = operations_on_b(b, a.clone());

        for e in a.write().unwrap().iter_mut() //borrow a as mutable in main thread only (err4)
        { *e += 1;}
    }
    return b;
}


fn main() {
    println!("{:?}", parallel_fn());
}