使用 crossbeam 通道和作用域线程的多线程代码无休止地卡住

Multi-threaded code that uses crossbeam channel and scoped threads stuck endlessly

我有以下代码使用容量 (20) 小于我想通过交叉束通道发送的数据总量 (32) 的有界通道。我的目标是使用多个发送线程 (8) 和一定数量 (4) 的数字,每个线程通过交叉束通道发送给单个接收者,并使所有这些并行发生以优化效率。这只是我试图解决的一个更大问题的一个小原型。但是,我的代码导致程序无休止地卡住,永远不会 exit/timeout。我也知道它为什么会发生 - r.iter() 阻塞直到发件人被丢弃,这只发生在范围之外。我尝试了各种无效的方法:

  1. 用每个线程克隆发件人然后删除它们(如您在评论中所见)

  2. 接收器代码在范围之外,但这只是让最终向量包含 20 个长度而不是所需的 32 个长度。

fn main() {
    use crossbeam_channel::{unbounded, bounded};
    use crossbeam_utils::thread::scope;
    use itertools::Itertools;

    let (s, r) = bounded(20);
    let mut v = vec![];

    scope(|scope| {
        scope.spawn(|_| {
            for data in r.iter() {
                v.push(data);
            }
        });
        let _sender_threads = (0..8)
            .into_iter()
            .map(|_| {
                scope.spawn(|_| {
                    // let s_clone = s.clone();
                    for i in 0..4 {
                        // std::thread::sleep(std::time::Duration::from_millis(100));
                        match s.send(i) {
                            Ok(_) => {
                                // println!("sent i {:?}", i);
                                ()
                            },
                            Err(_)=> {
                                // println!("error sending i {:?}", i);
                                ()
                            }
                        };
                    }
                    // drop(s_clone);
                })
            })
            .collect_vec();
    }).expect("scope error.");
    drop(s);
    println!("{:?}", v);
}

发生这种情况是因为 s 在范围结束之前不会被删除,但是在所有线程退出之前范围不会结束,并且调用 r.iter() 的线程直到s 被丢弃。这是典型的死锁场景。

您需要将 s 放入范围内,但您只能在发送方线程全部退出后才能这样做,因此您不能像目前这样 drop(s); 在范围内写了。

解决此问题的最简单方法是为每个发送方线程克隆 s 并将克隆移动到线程的闭包中,然后将 s 放入主范围。

fn main() {
    use crossbeam_channel::{unbounded, bounded};
    use crossbeam_utils::thread::scope;
    use itertools::Itertools;

    let (s, r) = bounded(20);
    let mut v = vec![];

    scope(|scope| {
        scope.spawn(|_| {
            for data in r.iter() {
                v.push(data);
            }
        });
        let _sender_threads = (0..8)
            .into_iter()
            .map(|_| {
                // ** Clone the sender, move it into the thread:
                let s = s.clone();
                scope.spawn(move |_| {
                    // let s_clone = s.clone();
                    for i in 0..4 {
                        // std::thread::sleep(std::time::Duration::from_millis(100));
                        match s.send(i) {
                            Ok(_) => {
                                // println!("sent i {:?}", i);
                                ()
                            },
                            Err(_)=> {
                                // println!("error sending i {:?}", i);
                                ()
                            }
                        };
                    }
                    // drop(s_clone);
                })
            })
            .collect_vec();

        // ** Drop the remaining sender.
        drop(s);
    }).expect("scope error.");
    println!("{:?}", v);
}

注意添加 let s = s.clone(); 并通过添加 move 对以下闭包进行更改,以便闭包获得克隆的所有权。然后我们将 drop(s) 移入范围。现在,一旦所有发送方线程都退出,通道的发送方将关闭,接收方的 for 循环将终止。

(Playground)