如何使用 mpsc 通道在线程之间创建环形通信?
How to create a ring communication between threads using mpsc channels?
我想生成 n 个线程,能够与环形拓扑中的其他线程通信,例如线程 0 可以向线程 1 发送消息,线程 1 可以向线程 2 发送消息,线程 n 可以向线程 0 发送消息。
这是我想要通过 n=3 实现的示例:
use std::sync::mpsc::{self, Receiver, Sender};
use std::thread;
let (tx0, rx0): (Sender<i32>, Receiver<i32>) = mpsc::channel();
let (tx1, rx1): (Sender<i32>, Receiver<i32>) = mpsc::channel();
let (tx2, rx2): (Sender<i32>, Receiver<i32>) = mpsc::channel();
let child0 = thread::spawn(move || {
tx0.send(0).unwrap();
println!("thread 0 sent: 0");
println!("thread 0 recv: {:?}", rx2.recv().unwrap());
});
let child1 = thread::spawn(move || {
tx1.send(1).unwrap();
println!("thread 1 sent: 1");
println!("thread 1 recv: {:?}", rx0.recv().unwrap());
});
let child2 = thread::spawn(move || {
tx2.send(2).unwrap();
println!("thread 2 sent: 2");
println!("thread 2 recv: {:?}", rx1.recv().unwrap());
});
child0.join();
child1.join();
child2.join();
在这里,我在一个循环中创建通道,将它们存储在一个向量中,对发送者重新排序,将它们存储在一个新的向量中,然后生成线程,每个线程都有自己的发送者-接收者(tx1/rx0,tx2/rx1, 等等) 对.
const NTHREADS: usize = 8;
// create n channels
let channels: Vec<(Sender<i32>, Receiver<i32>)> =
(0..NTHREADS).into_iter().map(|_| mpsc::channel()).collect();
// switch tupel entries for the senders to create ring topology
let mut channels_ring: Vec<(Sender<i32>, Receiver<i32>)> = (0..NTHREADS)
.into_iter()
.map(|i| {
(
channels[if i < channels.len() - 1 { i + 1 } else { 0 }].0,
channels[i].1,
)
})
.collect();
let mut children = Vec::new();
for i in 0..NTHREADS {
let (tx, rx) = channels_ring.remove(i);
let child = thread::spawn(move || {
tx.send(i).unwrap();
println!("thread {} sent: {}", i, i);
println!("thread {} recv: {:?}", i, rx.recv().unwrap());
});
children.push(child);
}
for child in children {
let _ = child.join();
}
这不起作用,因为无法复制 Sender 来创建新的向量。
但是,如果我使用 refs (& Sender):
let mut channels_ring: Vec<(&Sender<i32>, Receiver<i32>)> = (0..NTHREADS)
.into_iter()
.map(|i| {
(
&channels[if i < channels.len() - 1 { i + 1 } else { 0 }].0,
channels[i].1,
)
})
.collect();
我无法生成线程,因为 std::sync::mpsc::Sender<i32>
无法在线程之间安全共享。
This doesn't work, because Sender cannot be copied to create a new vector. However, if I use refs (& Sender):
虽然 Sender
确实无法复制,但它确实实现了 Clone
,因此您始终可以手动克隆它。但是这种方法不适用于 Receiver
,它不是 Clone
,您还需要从向量中提取它。
您的第一个代码的问题是您不能使用 let foo = vec[i]
将一个值从非 Copy
值的向量中移出。这将使向量处于无效状态,其中一个元素无效,随后对其进行访问将导致未定义的行为。为此,Vec
需要跟踪哪些元素已移动,哪些未移动,这将对所有 Vec
造成成本。因此,Vec
不允许将元素移出其中,将其留给用户跟踪移动。
将值移出 Vec
的一种简单方法是将 Vec<T>
替换为 Vec<Option<T>>
并使用 Option::take
. foo = vec[i]
is replaced with foo = vec[i].take().unwrap()
, which moves the T
value from the option in vec[i]
(while asserting that it's not None
) and leaves None
, a valid variant of Option<T>
, in the vector. Here is your first attempt modified in that manner (playground):
const NTHREADS: usize = 8;
let channels_ring: Vec<_> = {
let mut channels: Vec<_> = (0..NTHREADS)
.into_iter()
.map(|_| {
let (tx, rx) = mpsc::channel();
(Some(tx), Some(rx))
})
.collect();
(0..NTHREADS)
.into_iter()
.map(|rxpos| {
let txpos = if rxpos < NTHREADS - 1 { rxpos + 1 } else { 0 };
(
channels[txpos].0.take().unwrap(),
channels[rxpos].1.take().unwrap(),
)
})
.collect()
};
let children: Vec<_> = channels_ring
.into_iter()
.enumerate()
.map(|(i, (tx, rx))| {
thread::spawn(move || {
tx.send(i as i32).unwrap();
println!("thread {} sent: {}", i, i);
println!("thread {} recv: {:?}", i, rx.recv().unwrap());
})
})
.collect();
for child in children {
child.join().unwrap();
}
Sender
s 和 Receiver
s 无法共享,因此您需要将它们移动 到各自的线程中。这意味着将它们从 Vec
中移除,或者在迭代它时消耗 Vec
- 向量不允许处于无效状态(有孔),即使作为中间步骤也是如此。使用 into_iter
迭代向量将通过使用它们来实现。
一个可以让发送者和接收者在一个循环中配对的小技巧是创建两个向量;发送者之一和接收者之一;然后旋转一个,以便每个向量中的相同索引将为您提供所需的对。
use std::sync::mpsc::{self, Receiver, Sender};
use std::thread;
fn main() {
const NTHREADS: usize = 8;
// create n channels
let (mut senders, receivers): (Vec<Sender<i32>>, Vec<Receiver<i32>>) =
(0..NTHREADS).into_iter().map(|_| mpsc::channel()).unzip();
// move the first sender to the back
senders.rotate_left(1);
let children: Vec<_> = senders
.into_iter()
.zip(receivers.into_iter())
.enumerate()
.map(|(i, (tx, rx))| {
thread::spawn(move || {
tx.send(i as i32).unwrap();
println!("thread {} sent: {}", i, i);
println!("thread {} recv: {:?}", i, rx.recv().unwrap());
})
})
.collect();
for child in children {
let _ = child.join();
}
}
我想生成 n 个线程,能够与环形拓扑中的其他线程通信,例如线程 0 可以向线程 1 发送消息,线程 1 可以向线程 2 发送消息,线程 n 可以向线程 0 发送消息。
这是我想要通过 n=3 实现的示例:
use std::sync::mpsc::{self, Receiver, Sender};
use std::thread;
let (tx0, rx0): (Sender<i32>, Receiver<i32>) = mpsc::channel();
let (tx1, rx1): (Sender<i32>, Receiver<i32>) = mpsc::channel();
let (tx2, rx2): (Sender<i32>, Receiver<i32>) = mpsc::channel();
let child0 = thread::spawn(move || {
tx0.send(0).unwrap();
println!("thread 0 sent: 0");
println!("thread 0 recv: {:?}", rx2.recv().unwrap());
});
let child1 = thread::spawn(move || {
tx1.send(1).unwrap();
println!("thread 1 sent: 1");
println!("thread 1 recv: {:?}", rx0.recv().unwrap());
});
let child2 = thread::spawn(move || {
tx2.send(2).unwrap();
println!("thread 2 sent: 2");
println!("thread 2 recv: {:?}", rx1.recv().unwrap());
});
child0.join();
child1.join();
child2.join();
在这里,我在一个循环中创建通道,将它们存储在一个向量中,对发送者重新排序,将它们存储在一个新的向量中,然后生成线程,每个线程都有自己的发送者-接收者(tx1/rx0,tx2/rx1, 等等) 对.
const NTHREADS: usize = 8;
// create n channels
let channels: Vec<(Sender<i32>, Receiver<i32>)> =
(0..NTHREADS).into_iter().map(|_| mpsc::channel()).collect();
// switch tupel entries for the senders to create ring topology
let mut channels_ring: Vec<(Sender<i32>, Receiver<i32>)> = (0..NTHREADS)
.into_iter()
.map(|i| {
(
channels[if i < channels.len() - 1 { i + 1 } else { 0 }].0,
channels[i].1,
)
})
.collect();
let mut children = Vec::new();
for i in 0..NTHREADS {
let (tx, rx) = channels_ring.remove(i);
let child = thread::spawn(move || {
tx.send(i).unwrap();
println!("thread {} sent: {}", i, i);
println!("thread {} recv: {:?}", i, rx.recv().unwrap());
});
children.push(child);
}
for child in children {
let _ = child.join();
}
这不起作用,因为无法复制 Sender 来创建新的向量。 但是,如果我使用 refs (& Sender):
let mut channels_ring: Vec<(&Sender<i32>, Receiver<i32>)> = (0..NTHREADS)
.into_iter()
.map(|i| {
(
&channels[if i < channels.len() - 1 { i + 1 } else { 0 }].0,
channels[i].1,
)
})
.collect();
我无法生成线程,因为 std::sync::mpsc::Sender<i32>
无法在线程之间安全共享。
This doesn't work, because Sender cannot be copied to create a new vector. However, if I use refs (& Sender):
虽然 Sender
确实无法复制,但它确实实现了 Clone
,因此您始终可以手动克隆它。但是这种方法不适用于 Receiver
,它不是 Clone
,您还需要从向量中提取它。
您的第一个代码的问题是您不能使用 let foo = vec[i]
将一个值从非 Copy
值的向量中移出。这将使向量处于无效状态,其中一个元素无效,随后对其进行访问将导致未定义的行为。为此,Vec
需要跟踪哪些元素已移动,哪些未移动,这将对所有 Vec
造成成本。因此,Vec
不允许将元素移出其中,将其留给用户跟踪移动。
将值移出 Vec
的一种简单方法是将 Vec<T>
替换为 Vec<Option<T>>
并使用 Option::take
. foo = vec[i]
is replaced with foo = vec[i].take().unwrap()
, which moves the T
value from the option in vec[i]
(while asserting that it's not None
) and leaves None
, a valid variant of Option<T>
, in the vector. Here is your first attempt modified in that manner (playground):
const NTHREADS: usize = 8;
let channels_ring: Vec<_> = {
let mut channels: Vec<_> = (0..NTHREADS)
.into_iter()
.map(|_| {
let (tx, rx) = mpsc::channel();
(Some(tx), Some(rx))
})
.collect();
(0..NTHREADS)
.into_iter()
.map(|rxpos| {
let txpos = if rxpos < NTHREADS - 1 { rxpos + 1 } else { 0 };
(
channels[txpos].0.take().unwrap(),
channels[rxpos].1.take().unwrap(),
)
})
.collect()
};
let children: Vec<_> = channels_ring
.into_iter()
.enumerate()
.map(|(i, (tx, rx))| {
thread::spawn(move || {
tx.send(i as i32).unwrap();
println!("thread {} sent: {}", i, i);
println!("thread {} recv: {:?}", i, rx.recv().unwrap());
})
})
.collect();
for child in children {
child.join().unwrap();
}
Sender
s 和 Receiver
s 无法共享,因此您需要将它们移动 到各自的线程中。这意味着将它们从 Vec
中移除,或者在迭代它时消耗 Vec
- 向量不允许处于无效状态(有孔),即使作为中间步骤也是如此。使用 into_iter
迭代向量将通过使用它们来实现。
一个可以让发送者和接收者在一个循环中配对的小技巧是创建两个向量;发送者之一和接收者之一;然后旋转一个,以便每个向量中的相同索引将为您提供所需的对。
use std::sync::mpsc::{self, Receiver, Sender};
use std::thread;
fn main() {
const NTHREADS: usize = 8;
// create n channels
let (mut senders, receivers): (Vec<Sender<i32>>, Vec<Receiver<i32>>) =
(0..NTHREADS).into_iter().map(|_| mpsc::channel()).unzip();
// move the first sender to the back
senders.rotate_left(1);
let children: Vec<_> = senders
.into_iter()
.zip(receivers.into_iter())
.enumerate()
.map(|(i, (tx, rx))| {
thread::spawn(move || {
tx.send(i as i32).unwrap();
println!("thread {} sent: {}", i, i);
println!("thread {} recv: {:?}", i, rx.recv().unwrap());
})
})
.collect();
for child in children {
let _ = child.join();
}
}