带有 Rust `Sender` 和 Rayon 的 `for_each_with()` 的嵌套循环
Nested loops with a Rust `Sender` and the Rayon's `for_each_with()`
我有一些涉及使用嵌套循环的多线程代码,其中内部循环是 运行 并行的。每个线程的“共享”是一个 Sender
,将 return 结果。值得注意的是 Sender
实现了 Send
,因此克隆它并使用 Rayon 的 for_each_with()
发送它应该没有问题。然而,编译这段代码:
use std::sync::mpsc::channel;
use rayon::prelude::*;
fn main(){
let (sender, receiver) = channel();
(0..5).for_each(|i|{
(0..5).into_par_iter().for_each_with(&sender, |sender, j|{
sender.send(i + j).unwrap();
});
});
}
给我:
8 | (0..5).into_par_iter().for_each_with(&sender, |sender, j|{
| ^^^^^^^ `Sender<_>` cannot be shared between threads safely
|
= help: the trait `Sync` is not implemented for `Sender<_>`
= note: required because of the requirements on the impl of `Send` for `&Sender<_>`
现在我在想这可能是因为我正在尝试克隆一个引用,但是如果我将实际的 sender
移动到 for_each_with()
(即 for_each_with(sender, ...)
),它将被外循环的第一次迭代消耗:
error[E0507]: cannot move out of `sender`, a captured variable in an `FnMut` closure
(Playground).
如何以满足 Rust 编译器的方式实现此模式?
Sender
是用来克隆的,字面意思是 in the docs。每个子线程都必须有自己的发送者。
AFAIK,Rayon 使用线程池,这意味着 rayon 需要项目来实现 Send
因为它会将它发送到线程。人造丝不会为每个项目克隆,而是为每个线程克隆:
fn for_each_with<OP, T>(self, init: T, op: OP) where
OP: Fn(&mut T, Self::Item) + Sync + Send,
T: Send + Clone,
我们可以看到 OP
取 &mut T
而不是 T
。这意味着 for_each_with()
为每个使用的线程数量克隆,而不是为项目生产的数量克隆。
Reference need to implement Sync
to implement Send
. Sender
is define to not implement Sync
。我不知道这个选择的细节,这意味着 &Sender
不能在线程之间共享。我认为没有消除此约束的解决方案。
但如果你愿意,你可以使用 crossbeam-channel 实现 Sync
:
use crossbeam_channel::unbounded; // 0.5.1
use rayon::prelude::*; // 1.5.1
fn main() {
let (sender, _receiver) = unbounded();
for i in 0..5 {
(0..5).into_par_iter().for_each_with(&sender, |sender, j| {
sender.send(i + j).unwrap();
});
}
}
可以正常编译。好处是 crossbeam-channel 声称速度更快。也就是说,克隆对于 std Sender
:
完全没问题
use rayon::prelude::*;
use std::sync::mpsc::channel;
fn main() {
let (sender, _receiver) = channel();
for i in 0..5 {
let sender = sender.clone();
(0..5).into_par_iter().for_each_with(sender, |sender, j| {
sender.send(i + j).unwrap();
});
}
}
那确实会克隆 O(n)
次,但是来自 std 的 Sender
意味着要克隆很多。 (实际上,它可能只是添加一个克隆,因为您选择进行嵌套循环,代码可能不会克隆最后一个,而只是给它最后一个线程,因此您可以尽可能多地克隆一个 test )
无论如何,你所有的问题都来自一个奇怪的情况,应该像这样扁平化迭代:
use rayon::prelude::*;
use std::sync::mpsc::channel;
fn main() {
let (sender, _recever) = channel();
(0..5)
.into_par_iter()
.flat_map_iter(|i| (0..5).map(move |j| (i, j)))
.for_each_with(sender, |sender, (i, j)| {
sender.send(i + j).unwrap();
});
}
您也可以考虑不使用 Sender
人造丝可能意味着与 collect()
一起使用,而不是发送物品,您可以在最后收集它们。
我有一些涉及使用嵌套循环的多线程代码,其中内部循环是 运行 并行的。每个线程的“共享”是一个 Sender
,将 return 结果。值得注意的是 Sender
实现了 Send
,因此克隆它并使用 Rayon 的 for_each_with()
发送它应该没有问题。然而,编译这段代码:
use std::sync::mpsc::channel;
use rayon::prelude::*;
fn main(){
let (sender, receiver) = channel();
(0..5).for_each(|i|{
(0..5).into_par_iter().for_each_with(&sender, |sender, j|{
sender.send(i + j).unwrap();
});
});
}
给我:
8 | (0..5).into_par_iter().for_each_with(&sender, |sender, j|{
| ^^^^^^^ `Sender<_>` cannot be shared between threads safely
|
= help: the trait `Sync` is not implemented for `Sender<_>`
= note: required because of the requirements on the impl of `Send` for `&Sender<_>`
现在我在想这可能是因为我正在尝试克隆一个引用,但是如果我将实际的 sender
移动到 for_each_with()
(即 for_each_with(sender, ...)
),它将被外循环的第一次迭代消耗:
error[E0507]: cannot move out of `sender`, a captured variable in an `FnMut` closure
(Playground).
如何以满足 Rust 编译器的方式实现此模式?
Sender
是用来克隆的,字面意思是 in the docs。每个子线程都必须有自己的发送者。
AFAIK,Rayon 使用线程池,这意味着 rayon 需要项目来实现 Send
因为它会将它发送到线程。人造丝不会为每个项目克隆,而是为每个线程克隆:
fn for_each_with<OP, T>(self, init: T, op: OP) where
OP: Fn(&mut T, Self::Item) + Sync + Send,
T: Send + Clone,
我们可以看到 OP
取 &mut T
而不是 T
。这意味着 for_each_with()
为每个使用的线程数量克隆,而不是为项目生产的数量克隆。
Reference need to implement Sync
to implement Send
. Sender
is define to not implement Sync
。我不知道这个选择的细节,这意味着 &Sender
不能在线程之间共享。我认为没有消除此约束的解决方案。
但如果你愿意,你可以使用 crossbeam-channel 实现 Sync
:
use crossbeam_channel::unbounded; // 0.5.1
use rayon::prelude::*; // 1.5.1
fn main() {
let (sender, _receiver) = unbounded();
for i in 0..5 {
(0..5).into_par_iter().for_each_with(&sender, |sender, j| {
sender.send(i + j).unwrap();
});
}
}
可以正常编译。好处是 crossbeam-channel 声称速度更快。也就是说,克隆对于 std Sender
:
use rayon::prelude::*;
use std::sync::mpsc::channel;
fn main() {
let (sender, _receiver) = channel();
for i in 0..5 {
let sender = sender.clone();
(0..5).into_par_iter().for_each_with(sender, |sender, j| {
sender.send(i + j).unwrap();
});
}
}
那确实会克隆 O(n)
次,但是来自 std 的 Sender
意味着要克隆很多。 (实际上,它可能只是添加一个克隆,因为您选择进行嵌套循环,代码可能不会克隆最后一个,而只是给它最后一个线程,因此您可以尽可能多地克隆一个 test )
无论如何,你所有的问题都来自一个奇怪的情况,应该像这样扁平化迭代:
use rayon::prelude::*;
use std::sync::mpsc::channel;
fn main() {
let (sender, _recever) = channel();
(0..5)
.into_par_iter()
.flat_map_iter(|i| (0..5).map(move |j| (i, j)))
.for_each_with(sender, |sender, (i, j)| {
sender.send(i + j).unwrap();
});
}
您也可以考虑不使用 Sender
人造丝可能意味着与 collect()
一起使用,而不是发送物品,您可以在最后收集它们。