并发处理未知但数量有限的作业的有效模式是什么?
What is an efficient pattern for concurrently processing an unknown but finite number of jobs?
当我们有多个作业通过 mpsc with a parent thread, there are various strategies for program termination. When the last job is known, as in this example from the book, we can move the channel transmitter into the last thread and then iterate over the receiver. The iteration will terminate after all jobs have terminated because there will be no remaining references to the transmitter. Similarly, if we know the number of jobs n_jobs
upfront, as in this example from the threadpool docs 进行通信时,我们可以从接收方显式获取 n_jobs
结果并处理它们。最后,在服务器应用程序中,我们可以简单地永远监听接收器并完全避免检测终止条件。
但是,当我们要处理数量未知但数量有限的作业时,我们不太清楚该怎么做。
举一个具体的例子,假设一个图有一个已知的起始顶点 start
和一个初始未知的、从 start
可到达的有限数量的顶点。生成一个工作人员从 start
走图,例如计算可达顶点的最短路径。每次它发现以前看不见的可达顶点 reachable
时,它都会通过通道将 reachable
发送回主线程。同时,主线程监听通道并在接收到 reachable
时从池中生成一个新的 worker。最终,所有可达的顶点都被找到并处理,程序终止。
主线程监听通道并在完成后终止的有效模式是什么?我能想出的最好的办法是一个繁忙的循环,它检查接收器,检查工作人员计数(在 Mutex 中维护),然后适当地休眠或终止,如下面的人为设计所示示例:
use rand;
use std::sync::mpsc::{self, Sender};
use std::sync::{Arc, Mutex, MutexGuard};
use std::thread;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::channel();
let count_arc = Arc::new(Mutex::new(0));
count_up(0, &count_arc, count_arc.lock().unwrap(), &tx);
loop {
{
let count = count_arc.lock().unwrap();
let result = rx.try_recv();
if let Ok(val) = result {
println!("{}", val);
count_up(val, &count_arc, count, &tx);
} else if *count == 0 {
break;
}
}
thread::sleep(Duration::from_millis(10));
}
}
fn count_up(
from: usize,
count_arc: &Arc<Mutex<usize>>,
mut count: MutexGuard<usize>,
tx: &Sender<usize>,
) {
let count_arc = count_arc.clone();
*count += 1;
let tx = tx.clone();
thread::spawn(move || {
let cap: u8 = rand::random();
if from < cap as usize {
tx.send(from + 1).expect("message was sent");
}
*count_arc.lock().unwrap() -= 1;
});
}
另一个更简单的选择是递归生成线程,如下所示:
use std::sync::mpsc::{self, Sender};
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel();
count_up(0, tx);
for msg in rx {
println!("{}", msg);
}
}
fn count_up(from: usize, tx: Sender<usize>) {
thread::spawn(move || {
let cap: u8 = rand::random();
if from < cap as usize {
tx.send(from + 1).expect("message was sent");
count_up(from + 1, tx);
}
});
}
但是,与递归子项相比,我更喜欢管理器父项的想法。
为了避免我的示例的无限外观引起任何人的愤怒,请注意我当然会在任何实际代码中使用线程池。
我认为@Jmb 关于发送结构化消息的评论是我在繁忙的循环示例中所缺少的洞察力,因此我将在此处将其转化为答案。使用结构化消息重写并遍历接收器,示例可能如下所示:
use rand;
use std::sync::mpsc::{self, Sender};
use std::thread;
enum Message {
NewJob(usize),
Completed,
}
use Message::*;
fn main() {
let (tx, rx) = mpsc::channel();
let mut worker_count = 1usize;
count_up(0, &tx);
for msg in rx {
match msg {
NewJob(from) => {
println!("{}", from);
worker_count += 1;
count_up(from, &tx);
}
Completed => worker_count -= 1,
}
if worker_count == 0 {
break;
}
}
}
fn count_up(from: usize, tx: &Sender<Message>) {
let tx = tx.clone();
thread::spawn(move || {
let cap: u8 = rand::random();
if from < cap as usize {
tx.send(NewJob(from + 1)).expect("message was sent");
}
tx.send(Completed).expect("message was sent");
});
}
当我们有多个作业通过 mpsc with a parent thread, there are various strategies for program termination. When the last job is known, as in this example from the book, we can move the channel transmitter into the last thread and then iterate over the receiver. The iteration will terminate after all jobs have terminated because there will be no remaining references to the transmitter. Similarly, if we know the number of jobs n_jobs
upfront, as in this example from the threadpool docs 进行通信时,我们可以从接收方显式获取 n_jobs
结果并处理它们。最后,在服务器应用程序中,我们可以简单地永远监听接收器并完全避免检测终止条件。
但是,当我们要处理数量未知但数量有限的作业时,我们不太清楚该怎么做。
举一个具体的例子,假设一个图有一个已知的起始顶点 start
和一个初始未知的、从 start
可到达的有限数量的顶点。生成一个工作人员从 start
走图,例如计算可达顶点的最短路径。每次它发现以前看不见的可达顶点 reachable
时,它都会通过通道将 reachable
发送回主线程。同时,主线程监听通道并在接收到 reachable
时从池中生成一个新的 worker。最终,所有可达的顶点都被找到并处理,程序终止。
主线程监听通道并在完成后终止的有效模式是什么?我能想出的最好的办法是一个繁忙的循环,它检查接收器,检查工作人员计数(在 Mutex 中维护),然后适当地休眠或终止,如下面的人为设计所示示例:
use rand;
use std::sync::mpsc::{self, Sender};
use std::sync::{Arc, Mutex, MutexGuard};
use std::thread;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::channel();
let count_arc = Arc::new(Mutex::new(0));
count_up(0, &count_arc, count_arc.lock().unwrap(), &tx);
loop {
{
let count = count_arc.lock().unwrap();
let result = rx.try_recv();
if let Ok(val) = result {
println!("{}", val);
count_up(val, &count_arc, count, &tx);
} else if *count == 0 {
break;
}
}
thread::sleep(Duration::from_millis(10));
}
}
fn count_up(
from: usize,
count_arc: &Arc<Mutex<usize>>,
mut count: MutexGuard<usize>,
tx: &Sender<usize>,
) {
let count_arc = count_arc.clone();
*count += 1;
let tx = tx.clone();
thread::spawn(move || {
let cap: u8 = rand::random();
if from < cap as usize {
tx.send(from + 1).expect("message was sent");
}
*count_arc.lock().unwrap() -= 1;
});
}
另一个更简单的选择是递归生成线程,如下所示:
use std::sync::mpsc::{self, Sender};
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel();
count_up(0, tx);
for msg in rx {
println!("{}", msg);
}
}
fn count_up(from: usize, tx: Sender<usize>) {
thread::spawn(move || {
let cap: u8 = rand::random();
if from < cap as usize {
tx.send(from + 1).expect("message was sent");
count_up(from + 1, tx);
}
});
}
但是,与递归子项相比,我更喜欢管理器父项的想法。
为了避免我的示例的无限外观引起任何人的愤怒,请注意我当然会在任何实际代码中使用线程池。
我认为@Jmb 关于发送结构化消息的评论是我在繁忙的循环示例中所缺少的洞察力,因此我将在此处将其转化为答案。使用结构化消息重写并遍历接收器,示例可能如下所示:
use rand;
use std::sync::mpsc::{self, Sender};
use std::thread;
enum Message {
NewJob(usize),
Completed,
}
use Message::*;
fn main() {
let (tx, rx) = mpsc::channel();
let mut worker_count = 1usize;
count_up(0, &tx);
for msg in rx {
match msg {
NewJob(from) => {
println!("{}", from);
worker_count += 1;
count_up(from, &tx);
}
Completed => worker_count -= 1,
}
if worker_count == 0 {
break;
}
}
}
fn count_up(from: usize, tx: &Sender<Message>) {
let tx = tx.clone();
thread::spawn(move || {
let cap: u8 = rand::random();
if from < cap as usize {
tx.send(NewJob(from + 1)).expect("message was sent");
}
tx.send(Completed).expect("message was sent");
});
}