如何并行化这个循环?
How to parallelize this loop?
我 运行 与借用检查器撞墙了,我想是因为它无法判断在主线程加入后工作人员不再使用的东西。我已将我的问题简化为相同模式的最简单示例。我试图避免不必要的分配和复制,尤其是 b
.
fn sequential_fn() -> Vec<i32>
{
let mut a = vec![1,2,3,4]; //two mutable arrays
let mut b = vec![0;4];
for repetitions in 0..100
{
for i in 0..4
{
b[i] ^= a[i]; //example heavy operation - a isnt modified but b is
}
b.sort(); //heavy operation over, b is sorted
for e in a.iter_mut() //a is modified by main thread before repeating process
{ *e += 1;}
}
return b; //return b
}
上面的顺序版本可以编译并且工作正常。这就是我试图并行化繁重的操作以拆分工作负载的方式:
fn parallel_fn() -> Vec<i32>
{
let mut a = vec![1,2,3,4];
let mut b = vec![0;4];
let (mut b1 , mut b2) = b.split_at_mut(2); //split b into slices for each worker (err1)
for repetitions in 0..100
{
//only 1 worker for example purposes
let worker = std::thread::spawn(|| //(err2)
{
for i in 2..4
{
b2[i-2] ^= a[i] //mutably borrow b2, immutably borrow a
}
});
for i in 0..2
{
b1[i] ^= a[i]; // mutably borrow b1, immutably borrow a
}
worker.join(); //workers finish
b.sort(); //borrow b as mutable in main thread only (err3)
for e in a.iter_mut() //borrow a as mutable in main thread only (err4)
{ *e += 1;}
}
return b;
}
我遇到的错误是:
error[E0597]: `b` does not live long enough
--> src/lib.rs:5:29
|
5 | let (mut b1 , mut b2) = b.split_at_mut(2); //split b into slices for each worker (err1)
| ^^^^^^^^^^^^^^^^^
| |
| borrowed value does not live long enough
| argument requires that `b` is borrowed for `'static`
...
27 | }
| - `b` dropped here while still borrowed
error[E0499]: cannot borrow `*b2` as mutable more than once at a time
--> src/lib.rs:10:41
|
10 | let worker = std::thread::spawn(|| //(err2)
| - ^^ `*b2` was mutably borrowed here in the previous iteration of the loop
| ______________________|
| |
11 | | {
12 | | for i in 2..4
13 | | {
14 | | b2[i-2] ^= a[i] //mutably borrow b2, immutably borrow a
| | -- borrows occur due to use of `*b2` in closure
15 | | }
16 | | });
| |__________- argument requires that `*b2` is borrowed for `'static`
error[E0373]: closure may outlive the current function, but it borrows `a`, which is owned by the current function
--> src/lib.rs:10:41
|
10 | let worker = std::thread::spawn(|| //(err2)
| ^^ may outlive borrowed value `a`
...
14 | b2[i-2] ^= a[i] //mutably borrow b2, immutably borrow a
| - `a` is borrowed here
|
note: function requires argument type to outlive `'static`
--> src/lib.rs:10:22
|
10 | let worker = std::thread::spawn(|| //(err2)
| ______________________^
11 | | {
12 | | for i in 2..4
13 | | {
14 | | b2[i-2] ^= a[i] //mutably borrow b2, immutably borrow a
15 | | }
16 | | });
| |__________^
help: to force the closure to take ownership of `a` (and any other referenced variables), use the `move` keyword
|
10 | let worker = std::thread::spawn(move || //(err2)
| ++++
error[E0499]: cannot borrow `b` as mutable more than once at a time
--> src/lib.rs:22:9
|
5 | let (mut b1 , mut b2) = b.split_at_mut(2); //split b into slices for each worker (err1)
| ----------------- first mutable borrow occurs here
...
19 | b1[i] ^= a[i]; // mutably borrow b1, immutably borrow a
| ----- first borrow later used here
...
22 | b.sort(); //borrow b as mutable in main thread only (err3)
| ^^^^^^^^ second mutable borrow occurs here
error[E0502]: cannot borrow `a` as mutable because it is also borrowed as immutable
--> src/lib.rs:23:18
|
10 | let worker = std::thread::spawn(|| //(err2)
| - -- immutable borrow occurs here
| ______________________|
| |
11 | | {
12 | | for i in 2..4
13 | | {
14 | | b2[i-2] ^= a[i] //mutably borrow b2, immutably borrow a
| | - first borrow occurs due to use of `a` in closure
15 | | }
16 | | });
| |__________- argument requires that `a` is borrowed for `'static`
...
23 | for e in a.iter_mut() //borrow a as mutable in main thread only (err4)
| ^^^^^^^^^^^^ mutable borrow occurs here
error[E0505]: cannot move out of `b` because it is borrowed
--> src/lib.rs:26:12
|
5 | let (mut b1 , mut b2) = b.split_at_mut(2); //split b into slices for each worker (err1)
| -----------------
| |
| borrow of `b` occurs here
| argument requires that `b` is borrowed for `'static`
...
26 | return b;
| ^ move out of `b` occurs here
你在代码中做了一堆假设,rust borrow checker 无法在编译类型中确保这些假设:
- 当
parallel_fn
完成时,a
和 b
将被删除。您正在调用 .join()
以确保生成的线程应该在那个时候完成,但这是一个运行时检查,rust 无法在编译时检查它。
这就是为什么你不能传递对 spawn 的引用,所以你需要 arc
- 您正在调用
.join()
以确保在您修改它时不会发生对 a
的读取操作,但编译器无法检查它。
- 你在不同的线程中改变了同一个向量的部分,除非你格外小心,否则这是不安全的,所以你需要一些不安全的代码。替代方法是将向量拆分为 2 个向量(不是切片)并加入它们,但这是额外的复制。
我得到了这样的结果:
use std::sync::{Mutex, Arc, RwLock};
#[derive(Debug)]
struct VectorHolder {
vec: Vec<i32>,
slice1_ptr: Mutex<*mut [i32]>,
slice2_ptr: Mutex<*mut [i32]>,
}
impl VectorHolder {
pub fn new(mut vec: Vec<i32>) -> Self {
let (slice1 , slice2) = vec.split_at_mut(2);
let slice1_ptr = slice1 as *mut _;
let slice2_ptr = slice2 as *mut _;
Self {
vec,
slice1_ptr: Mutex::new(slice1_ptr),
slice2_ptr: Mutex::new(slice2_ptr),
}
}
/// Below operations are "safe" as no one else can access parts of Vec until
/// VectorHolder is consumed.
/// It is also safe we can't call op1 from 2 threads due to the mutex.
/// Mutex could optionally be removed if you are sure you never call op1 and op2 concurrently.
/// In this case we are sure about that as we have .join
pub fn op1(&self, a: &[i32]) {
let mut guard = self.slice2_ptr.lock().unwrap();
let b2 = unsafe { &mut **guard };
for i in 2..4 {
b2[i-2] ^= a[i] //mutably borrow b2, immutably borrow a
}
}
pub fn op2(&self, a: &[i32]) {
let mut guard = self.slice1_ptr.lock().unwrap();
let b1 = unsafe { &mut **guard };
for i in 0..2 {
b1[i] ^= a[i]; // mutably borrow b1, immutably borrow a
}
}
pub fn consume(self) -> Vec<i32> {
self.vec
}
}
unsafe impl Send for VectorHolder { }
unsafe impl Sync for VectorHolder { }
pub fn operations_on_b(b: Vec<i32>, a: Arc<RwLock<Vec<i32>>>) -> Vec<i32> {
let holder = Arc::new(VectorHolder::new(b));
//only 1 worker for example purposes
let holder_clone = holder.clone();
let a_clone = a.clone();
let worker = std::thread::spawn(move || {
holder_clone.op1(a_clone.read().unwrap().as_slice());
});
holder.op2(a.read().unwrap().as_slice());
worker.join().unwrap(); //workers finish
let mut modified_b = Arc::try_unwrap(holder).unwrap().consume();
modified_b.sort();
modified_b
}
fn parallel_fn() -> Vec<i32>
{
let a = Arc::new(RwLock::new(vec![1,2,3,4]));
let mut b = vec![0;4];
for _repetitions in 0..100
{
b = operations_on_b(b, a.clone());
for e in a.write().unwrap().iter_mut() //borrow a as mutable in main thread only (err4)
{ *e += 1;}
}
return b;
}
fn main() {
println!("{:?}", parallel_fn());
}
我 运行 与借用检查器撞墙了,我想是因为它无法判断在主线程加入后工作人员不再使用的东西。我已将我的问题简化为相同模式的最简单示例。我试图避免不必要的分配和复制,尤其是 b
.
fn sequential_fn() -> Vec<i32>
{
let mut a = vec![1,2,3,4]; //two mutable arrays
let mut b = vec![0;4];
for repetitions in 0..100
{
for i in 0..4
{
b[i] ^= a[i]; //example heavy operation - a isnt modified but b is
}
b.sort(); //heavy operation over, b is sorted
for e in a.iter_mut() //a is modified by main thread before repeating process
{ *e += 1;}
}
return b; //return b
}
上面的顺序版本可以编译并且工作正常。这就是我试图并行化繁重的操作以拆分工作负载的方式:
fn parallel_fn() -> Vec<i32>
{
let mut a = vec![1,2,3,4];
let mut b = vec![0;4];
let (mut b1 , mut b2) = b.split_at_mut(2); //split b into slices for each worker (err1)
for repetitions in 0..100
{
//only 1 worker for example purposes
let worker = std::thread::spawn(|| //(err2)
{
for i in 2..4
{
b2[i-2] ^= a[i] //mutably borrow b2, immutably borrow a
}
});
for i in 0..2
{
b1[i] ^= a[i]; // mutably borrow b1, immutably borrow a
}
worker.join(); //workers finish
b.sort(); //borrow b as mutable in main thread only (err3)
for e in a.iter_mut() //borrow a as mutable in main thread only (err4)
{ *e += 1;}
}
return b;
}
我遇到的错误是:
error[E0597]: `b` does not live long enough
--> src/lib.rs:5:29
|
5 | let (mut b1 , mut b2) = b.split_at_mut(2); //split b into slices for each worker (err1)
| ^^^^^^^^^^^^^^^^^
| |
| borrowed value does not live long enough
| argument requires that `b` is borrowed for `'static`
...
27 | }
| - `b` dropped here while still borrowed
error[E0499]: cannot borrow `*b2` as mutable more than once at a time
--> src/lib.rs:10:41
|
10 | let worker = std::thread::spawn(|| //(err2)
| - ^^ `*b2` was mutably borrowed here in the previous iteration of the loop
| ______________________|
| |
11 | | {
12 | | for i in 2..4
13 | | {
14 | | b2[i-2] ^= a[i] //mutably borrow b2, immutably borrow a
| | -- borrows occur due to use of `*b2` in closure
15 | | }
16 | | });
| |__________- argument requires that `*b2` is borrowed for `'static`
error[E0373]: closure may outlive the current function, but it borrows `a`, which is owned by the current function
--> src/lib.rs:10:41
|
10 | let worker = std::thread::spawn(|| //(err2)
| ^^ may outlive borrowed value `a`
...
14 | b2[i-2] ^= a[i] //mutably borrow b2, immutably borrow a
| - `a` is borrowed here
|
note: function requires argument type to outlive `'static`
--> src/lib.rs:10:22
|
10 | let worker = std::thread::spawn(|| //(err2)
| ______________________^
11 | | {
12 | | for i in 2..4
13 | | {
14 | | b2[i-2] ^= a[i] //mutably borrow b2, immutably borrow a
15 | | }
16 | | });
| |__________^
help: to force the closure to take ownership of `a` (and any other referenced variables), use the `move` keyword
|
10 | let worker = std::thread::spawn(move || //(err2)
| ++++
error[E0499]: cannot borrow `b` as mutable more than once at a time
--> src/lib.rs:22:9
|
5 | let (mut b1 , mut b2) = b.split_at_mut(2); //split b into slices for each worker (err1)
| ----------------- first mutable borrow occurs here
...
19 | b1[i] ^= a[i]; // mutably borrow b1, immutably borrow a
| ----- first borrow later used here
...
22 | b.sort(); //borrow b as mutable in main thread only (err3)
| ^^^^^^^^ second mutable borrow occurs here
error[E0502]: cannot borrow `a` as mutable because it is also borrowed as immutable
--> src/lib.rs:23:18
|
10 | let worker = std::thread::spawn(|| //(err2)
| - -- immutable borrow occurs here
| ______________________|
| |
11 | | {
12 | | for i in 2..4
13 | | {
14 | | b2[i-2] ^= a[i] //mutably borrow b2, immutably borrow a
| | - first borrow occurs due to use of `a` in closure
15 | | }
16 | | });
| |__________- argument requires that `a` is borrowed for `'static`
...
23 | for e in a.iter_mut() //borrow a as mutable in main thread only (err4)
| ^^^^^^^^^^^^ mutable borrow occurs here
error[E0505]: cannot move out of `b` because it is borrowed
--> src/lib.rs:26:12
|
5 | let (mut b1 , mut b2) = b.split_at_mut(2); //split b into slices for each worker (err1)
| -----------------
| |
| borrow of `b` occurs here
| argument requires that `b` is borrowed for `'static`
...
26 | return b;
| ^ move out of `b` occurs here
你在代码中做了一堆假设,rust borrow checker 无法在编译类型中确保这些假设:
- 当
parallel_fn
完成时,a
和b
将被删除。您正在调用.join()
以确保生成的线程应该在那个时候完成,但这是一个运行时检查,rust 无法在编译时检查它。 这就是为什么你不能传递对 spawn 的引用,所以你需要 arc - 您正在调用
.join()
以确保在您修改它时不会发生对a
的读取操作,但编译器无法检查它。 - 你在不同的线程中改变了同一个向量的部分,除非你格外小心,否则这是不安全的,所以你需要一些不安全的代码。替代方法是将向量拆分为 2 个向量(不是切片)并加入它们,但这是额外的复制。
我得到了这样的结果:
use std::sync::{Mutex, Arc, RwLock};
#[derive(Debug)]
struct VectorHolder {
vec: Vec<i32>,
slice1_ptr: Mutex<*mut [i32]>,
slice2_ptr: Mutex<*mut [i32]>,
}
impl VectorHolder {
pub fn new(mut vec: Vec<i32>) -> Self {
let (slice1 , slice2) = vec.split_at_mut(2);
let slice1_ptr = slice1 as *mut _;
let slice2_ptr = slice2 as *mut _;
Self {
vec,
slice1_ptr: Mutex::new(slice1_ptr),
slice2_ptr: Mutex::new(slice2_ptr),
}
}
/// Below operations are "safe" as no one else can access parts of Vec until
/// VectorHolder is consumed.
/// It is also safe we can't call op1 from 2 threads due to the mutex.
/// Mutex could optionally be removed if you are sure you never call op1 and op2 concurrently.
/// In this case we are sure about that as we have .join
pub fn op1(&self, a: &[i32]) {
let mut guard = self.slice2_ptr.lock().unwrap();
let b2 = unsafe { &mut **guard };
for i in 2..4 {
b2[i-2] ^= a[i] //mutably borrow b2, immutably borrow a
}
}
pub fn op2(&self, a: &[i32]) {
let mut guard = self.slice1_ptr.lock().unwrap();
let b1 = unsafe { &mut **guard };
for i in 0..2 {
b1[i] ^= a[i]; // mutably borrow b1, immutably borrow a
}
}
pub fn consume(self) -> Vec<i32> {
self.vec
}
}
unsafe impl Send for VectorHolder { }
unsafe impl Sync for VectorHolder { }
pub fn operations_on_b(b: Vec<i32>, a: Arc<RwLock<Vec<i32>>>) -> Vec<i32> {
let holder = Arc::new(VectorHolder::new(b));
//only 1 worker for example purposes
let holder_clone = holder.clone();
let a_clone = a.clone();
let worker = std::thread::spawn(move || {
holder_clone.op1(a_clone.read().unwrap().as_slice());
});
holder.op2(a.read().unwrap().as_slice());
worker.join().unwrap(); //workers finish
let mut modified_b = Arc::try_unwrap(holder).unwrap().consume();
modified_b.sort();
modified_b
}
fn parallel_fn() -> Vec<i32>
{
let a = Arc::new(RwLock::new(vec![1,2,3,4]));
let mut b = vec![0;4];
for _repetitions in 0..100
{
b = operations_on_b(b, a.clone());
for e in a.write().unwrap().iter_mut() //borrow a as mutable in main thread only (err4)
{ *e += 1;}
}
return b;
}
fn main() {
println!("{:?}", parallel_fn());
}