多个线程如何共享一个迭代器?

How can multiple threads share an iterator?

我一直在研究一个函数,该函数将使用 Rust 和线程将一堆文件从源复制到目标。我在让线程共享迭代器时遇到了一些麻烦。还不习惯借阅系统:

extern crate libc;
extern crate num_cpus;

use libc::{c_char, size_t};
use std::thread;
use std::fs::copy;

fn python_str_array_2_str_vec<T, U, V>(_: T, _: U) -> V {
    unimplemented!()
}

#[no_mangle]
pub extern "C" fn copyFiles(
    sources: *const *const c_char,
    destinies: *const *const c_char,
    array_len: size_t,
) {
    let src: Vec<&str> = python_str_array_2_str_vec(sources, array_len);
    let dst: Vec<&str> = python_str_array_2_str_vec(destinies, array_len);
    let mut iter = src.iter().zip(dst);
    let num_threads = num_cpus::get();
    let threads = (0..num_threads).map(|_| {
        thread::spawn(|| while let Some((s, d)) = iter.next() {
            copy(s, d);
        })
    });
    for t in threads {
        t.join();
    }
}

fn main() {}

我遇到无法解决的编译错误:

error[E0597]: `src` does not live long enough
  --> src/main.rs:20:20
   |
20 |     let mut iter = src.iter().zip(dst);
   |                    ^^^ does not live long enough
...
30 | }
   | - borrowed value only lives until here
   |
   = note: borrowed value must be valid for the static lifetime...

error[E0373]: closure may outlive the current function, but it borrows `**iter`, which is owned by the current function
  --> src/main.rs:23:23
   |
23 |         thread::spawn(|| while let Some((s, d)) = iter.next() {
   |                       ^^                          ---- `**iter` is borrowed here
   |                       |
   |                       may outlive borrowed value `**iter`
   |
help: to force the closure to take ownership of `**iter` (and any other referenced variables), use the `move` keyword, as shown:
   |         thread::spawn(move || while let Some((s, d)) = iter.next() {

我已经看过以下问题:

我没有使用 chunks,我想尝试通过线程共享一个 迭代器 ,尽管创建块以将它们传递给线程将是经典的解决方案。

我已经看到一些使用通道与线程通信的答案,但我不太确定是否使用它们。应该有一种更简单的方法来通过线程共享一个对象。

这引起了我的注意,scoped 应该可以修复我的错误,但是由于它在不稳定的通道中,我想看看是否有另一种方法可以只使用 spawn.

有人可以解释我应该如何修复生命周期以便可以从线程访问迭代器吗?

这是您的问题的 minimal, reproducible example

use std::thread;

fn main() {
    let src = vec!["one"];
    let dst = vec!["two"];
    let mut iter = src.iter().zip(dst);
    thread::spawn(|| {
        while let Some((s, d)) = iter.next() {
            println!("{} -> {}", s, d);
        }
    });
}

存在多个相关问题:

  1. 迭代器存在于堆栈中,线程的闭包引用它。
  2. 闭包采用 mutable 对迭代器的引用。
  3. 迭代器本身有一个对驻留在堆栈中的 Vec 的引用。
  4. Vec 本身引用了字符串切片,这些切片 可能 存在于堆栈中,但不能保证在任何一种情况下都比线程存在的时间更长。

换句话说,Rust 编译器已阻止您执行四个独立的内存不安全部分。

要认识到的一个主要问题是,您生成的任何线程 可能 比您生成它的地方更长寿。即使您立即调用 join,编译器也无法静态验证是否会发生,因此它必须采取保守的路径。这就是 scoped threads 的要点——它们 保证 线程在它们启动的堆栈帧之前退出。

此外,您正试图在多个并发线程中使用可变引用。 zero 保证可以并行安全地调用迭代器(或它所构建的任何迭代器)。两个线程完全有可能同时在 调用 next。两段代码运行并行写入同一个内存地址。一个线程写入一半数据,另一个线程写入另一半,现在您的程序在未来的某个任意点崩溃。

使用像 crossbeam 这样的工具,您的代码将类似于:

use crossbeam; // 0.7.3

fn main() {
    let src = vec!["one"];
    let dst = vec!["two"];

    let mut iter = src.iter().zip(dst);
    while let Some((s, d)) = iter.next() {
        crossbeam::scope(|scope| {
            scope.spawn(|_| {
                println!("{} -> {}", s, d);
            });
        })
        .unwrap();
    }
}

如前所述,这一次只会产生一个线程,等待它完成。获得更多并行性的替代方法(本练习的常用点)是交换对 nextspawn 的调用。这需要通过 move 关键字将 sd 的所有权转移到线程:

use crossbeam; // 0.7.3

fn main() {
    let src = vec!["one", "alpha"];
    let dst = vec!["two", "beta"];

    let mut iter = src.iter().zip(dst);
    crossbeam::scope(|scope| {
        while let Some((s, d)) = iter.next() {
            scope.spawn(move |_| {
                println!("{} -> {}", s, d);
            });
        }
    })
    .unwrap();
}

如果在 spawn 中添加睡眠调用,您可以看到线程 运行 并行。

我会使用 for 循环来编写它,但是:

let iter = src.iter().zip(dst);
crossbeam::scope(|scope| {
    for (s, d) in iter {
        scope.spawn(move |_| {
            println!("{} -> {}", s, d);
        });
    }
}).unwrap();

最后,迭代器在当前线程上执行,然后从迭代器返回的每个值都交给一个新线程。保证新线程在捕获引用之前退出。

您可能对 Rayon 感兴趣,这是一个可以轻松并行化某些类型的迭代器的 crate。

另请参阅:

  • How can I pass a reference to a stack variable to a thread?
  • Cannot call a function in a spawned thread because it "does not fulfill the required lifetime"