在 Rust 中使用多线程更改向量中的元素

Change elements in vector using multithreading in Rust

我是 Rust 的新手,我正在尝试将计算工作分配给线程。

我有字符串向量,我想为每个字符串创建一个线程来完成他的工作。有简单的代码:

use std::thread;

fn child_job(s: &mut String) {
    *s = s.to_uppercase();
}

fn main() {
    // initialize
    let mut thread_handles = vec![];
    let mut strings = vec![
        "hello".to_string(),
        "world".to_string(),
        "testing".to_string(),
        "good enough".to_string(),
    ];

    // create threads
    for s in &mut strings {
        thread_handles.push(thread::spawn(|| child_job(s)));
    }

    // wait for threads
    for handle in thread_handles {
        handle.join().unwrap();
    }

    // print result
    for s in strings {
        println!("{}", s);
    }
}

编译时出现错误:

error[E0597]: `strings` does not live long enough
  --> src/main.rs:18:14
   |
18 |     for s in &mut strings {
   |              ^^^^^^^^^^^^
   |              |
   |              borrowed value does not live long enough
   |              argument requires that `strings` is borrowed for `'static`
...
31 | }
   | - `strings` dropped here while still borrowed

error[E0505]: cannot move out of `strings` because it is borrowed
  --> src/main.rs:28:14
   |
18 |     for s in &mut strings {
   |              ------------
   |              |
   |              borrow of `strings` occurs here
   |              argument requires that `strings` is borrowed for `'static`
...
28 |     for s in strings {
   |              ^^^^^^^ move out of `strings` occurs here

我不明白指针的生命周期有什么问题以及我应该如何解决这个问题。对我来说它看起来不错,因为每个线程只获得一个可变的字符串指针并且不会以任何方式影响向量本身。

Rust 不知道你的字符串会和你的线程一样长,所以它不会将对它们的引用传递给线程。想象一下,如果您将对字符串的引用传递给另一个线程,那么原始线程认为它已经完成了对该字符串的处理并释放了它的内存。那会导致未定义的行为。 Rust 通过要求将字符串保存在 reference-counted 指针后面(确保它们的内存在它们仍然在某处被引用时不会被释放)或者它们具有 'static 生命周期来防止这种情况,这意味着它们是存储在可执行二进制文件本身中。

此外,Rust 不允许您跨线程共享可变引用,因为它不安全(多个线程可能会尝试同时更改引用的数据)。您想将 std::sync::Arcstd::sync::Mutex 结合使用。您的 strings 向量将变为 Vec<Arc<Mutex<String>>>。然后,您可以复制 Arc(使用 .clone())并跨线程发送。 Arc 是一个指针,用于保持引用计数自动递增(阅读:以 thread-safe 方式)。互斥量允许一个线程临时锁定字符串,这样其他线程就不能触及它,然后再解锁字符串(线程可以在锁定时安全地更改字符串)。

您的代码将如下所示:

use std::thread;
use std::sync::{Arc, Mutex};

fn child_job(s: Arc<Mutex<String>>) {
    // Lock s so other threads can't touch it. It will get
    // unlocked when it goes out of scope of this function.
    let mut s = s.lock().unwrap();
    *s = s.to_uppercase();
}

fn main() {
    // initialize
    let mut thread_handles = Vec::new();
    let strings = vec![
        Arc::new(Mutex::new("hello".to_string())),
        Arc::new(Mutex::new("world".to_string())),
        Arc::new(Mutex::new("testing".to_string())),
        Arc::new(Mutex::new("good enough".to_string())),
    ];

    // create threads
    for i in 0..strings.len() {
        let s = strings[i].clone();
        thread_handles.push(thread::spawn(|| child_job(s)));
    }

    // wait for threads
    for handle in thread_handles {
        handle.join().unwrap();
    }

    // print result
    for s in strings {
        let s = s.lock().unwrap();
        println!("{}", *s);
    }
}

对于 thread::spawnJoinHandles,借用检查器不够智能,无法知道您的线程将在 main 退出之前完成(这对借用有点不公平checker,它真的不知道),因此它不能证明 strings 会存在足够长的时间让你的线程在它上面工作。您可以像@tedtanner 建议的那样使用 Arcs 来回避这个问题(从某种意义上说,这意味着您在运行时进行生命周期管理),或者您可以使用作用域线程。

作用域线程本质上是一种告诉借用检查器的方式:是的,该线程将在该作用域结束(被删除)之前完成。然后,您可以将对当前线程堆栈中的内容的引用传递给另一个线程:

crossbeam::thread::scope(|scope| {
    for s in &mut strings {
        scope.spawn(|_| child_job(s));
    }
}) // All spawned threads are auto-joined here, no need for join_handles
.unwrap();

Playground

目前,您需要一个 crate(我建议 crossbeam),但此功能最终应该会成为标准。

Caesar 的回答显示了如何使用 crossbeam 的作用域线程解决问题。如果您不想依赖 crossbeam,那么将值包装在 Arc<Mutex<T>> 中的方法是一种合理的通用策略,如 tedtanner 的回答所示。

然而,在这种情况下,互斥量确实是不必要的,因为线程不共享字符串,无论是彼此之间还是与主线程。锁定是使用 Arc 的产物,它本身是由静态生命周期强制执行的,而不是共享的需要。尽管锁是无竞争的,但它们确实会增加一些开销,最好避免使用。在这种情况下,我们可以通过 将每个字符串移动 到其各自的线程,并在线程完成后检索修改后的字符串来避免 ArcMutex

此修改仅使用标准库和安全代码编译和运行,不需要 ArcMutex:

// ... child_job defined as in the question ...

fn main() {
    let strings = vec![
        "hello".to_string(),
        "world".to_string(),
        "testing".to_string(),
        "good enough".to_string(),
    ];

    // start the threads, giving them the strings
    let mut thread_handles = vec![];
    for mut s in strings {
        thread_handles.push(thread::spawn(move || {
            child_job(&mut s);
            s
        }));
    }

    // wait for threads and re-populate `strings`
    let strings = thread_handles.into_iter().map(|h| h.join().unwrap());

    // print result
    for s in strings {
        println!("{}", s);
    }
}

Playground