带有 Rust `Sender` 和 Rayon 的 `for_each_with()` 的嵌套循环

Nested loops with a Rust `Sender` and the Rayon's `for_each_with()`

我有一些涉及使用嵌套循环的多线程代码,其中内部循环是 运行 并行的。每个线程的“共享”是一个 Sender,将 return 结果。值得注意的是 Sender 实现了 Send,因此克隆它并使用 Rayon 的 for_each_with() 发送它应该没有问题。然而,编译这段代码:

use std::sync::mpsc::channel;
use rayon::prelude::*;

fn main(){
    let (sender, receiver) = channel();

    (0..5).for_each(|i|{
        (0..5).into_par_iter().for_each_with(&sender, |sender, j|{
            sender.send(i + j).unwrap();   
        });
    });
}

给我:

8 |         (0..5).into_par_iter().for_each_with(&sender, |sender, j|{
  |                                              ^^^^^^^ `Sender<_>` cannot be shared between threads safely
  |
  = help: the trait `Sync` is not implemented for `Sender<_>`
  = note: required because of the requirements on the impl of `Send` for `&Sender<_>`

(Playground)

现在我在想这可能是因为我正在尝试克隆一个引用,但是如果我将实际的 sender 移动到 for_each_with()(即 for_each_with(sender, ...)),它将被外循环的第一次迭代消耗:

error[E0507]: cannot move out of `sender`, a captured variable in an `FnMut` closure

(Playground).

如何以满足 Rust 编译器的方式实现此模式?

Sender 是用来克隆的,字面意思是 in the docs。每个子线程都必须有自己的发送者。

AFAIK,Rayon 使用线程池,这意味着 rayon 需要项目来实现 Send 因为它会将它发送到线程。人造丝不会为每个项目克隆,而是为每个线程克隆:

fn for_each_with<OP, T>(self, init: T, op: OP) where
    OP: Fn(&mut T, Self::Item) + Sync + Send,
    T: Send + Clone,

我们可以看到 OP&mut T 而不是 T。这意味着 for_each_with() 为每个使用的线程数量克隆,而不是为项目生产的数量克隆。

Reference need to implement Sync to implement Send. Sender is define to not implement Sync。我不知道这个选择的细节,这意味着 &Sender 不能在线程之间共享。我认为没有消除此约束的解决方案。

但如果你愿意,你可以使用 crossbeam-channel 实现 Sync:

use crossbeam_channel::unbounded; // 0.5.1
use rayon::prelude::*; // 1.5.1

fn main() {
    let (sender, _receiver) = unbounded();

    for i in 0..5 {
        (0..5).into_par_iter().for_each_with(&sender, |sender, j| {
            sender.send(i + j).unwrap();
        });
    }
}

可以正常编译。好处是 crossbeam-channel 声称速度更快。也就是说,克隆对于 std Sender:

完全没问题
use rayon::prelude::*;
use std::sync::mpsc::channel;

fn main() {
    let (sender, _receiver) = channel();

    for i in 0..5 {
        let sender = sender.clone();
        (0..5).into_par_iter().for_each_with(sender, |sender, j| {
            sender.send(i + j).unwrap();
        });
    }
}

那确实会克隆 O(n) 次,但是来自 std 的 Sender 意味着要克隆很多。 (实际上,它可能只是添加一个克隆,因为您选择进行嵌套循环,代码可能不会克隆最后一个,而只是给它最后一个线程,因此您可以尽可能多地克隆一个 test )

无论如何,你所有的问题都来自一个奇怪的情况,应该像这样扁平化迭代:

use rayon::prelude::*;
use std::sync::mpsc::channel;

fn main() {
    let (sender, _recever) = channel();

    (0..5)
        .into_par_iter()
        .flat_map_iter(|i| (0..5).map(move |j| (i, j)))
        .for_each_with(sender, |sender, (i, j)| {
            sender.send(i + j).unwrap();
        });
}

您也可以考虑不使用 Sender 人造丝可能意味着与 collect() 一起使用,而不是发送物品,您可以在最后收集它们。