共享结构的 Rust 死锁:Arc + channel + atomic

Rust deadlock with shared struct: Arc + channel + atomic

我是 Rust 的新手,正试图为一个项目动态生成大量 JSON 数据,但我遇到了死锁。

我已经尝试删除序列化 (json_serde) 并改为在通道中发送 HashMap,但我的计算机上仍然出现死锁。但是,如果我评论 send(generator.next()) 行并自己发送一个字符串,代码将完美运行,因此死锁是由我的 DatasetGenerator 引起的,但我不明白为什么。

代码摘要:

取决于我在代码下方评论 tx_ref.send(generator_ref.next()) tx_ref.send(some_new_string) 是死锁还是成功:

src/main.rs:

extern crate threads_pool;

use threads_pool::*;

mod generator;

use std::sync::mpsc;
use std::sync::Arc;
use std::thread;

fn main() {
    // N will be an argument, and a very high number. For tests use this:
    const N: i64 = 12;  // Increase this if you're not getting the deadlock yet, or run cargo run again until it happens.
    let (tx, rx) = mpsc::channel();

    let tx_producer = tx.clone();
    let producer_thread = thread::spawn(move || {
        let pool = ThreadPool::new(4);
        let generator = Arc::new(generator::data_generator::DatasetGenerator::new(3000));
        for i in 0..N {
            println!("Generating #{}", i);
            let tx_ref = tx_producer.clone();
            let generator_ref = generator.clone();
            pool.execute(move || {
                ////////// v !!!DEADLOCK HERE!!! v ////////// 
                tx_ref.send(generator_ref.next()).expect("tx failed.");              // This locks!
                //tx_ref.send(format!(" {}            ", i)).expect("tx failed.");   // This works!
                ////////// ^ !!!DEADLOCK HERE!!! ^ ////////// 
            })
            .unwrap();
        }

        println!("Generator done!");
    });

    println!("-» Consumer consuming!");
    for j in 0..N {
        let s = rx.recv().expect("rx failed");
        println!("-» Consumed #{}:   {} ...     ", j, &s[..10]);
    }
    println!("Consumer done!!");

    producer_thread.join().unwrap();
    println!("Success. Exit!");
}

这是我的 DatasetGenerator,它似乎是造成所有问题的原因(因为不使用 serde 但输出 HashMaps 仍然会导致死锁)。 src/generator/dataset_generator.rs:

use serde_json::Value;
use std::collections::HashMap;
use std::sync::atomic;

pub struct DatasetGenerator {
    num_features: usize,
    pub counter: atomic::AtomicI64,
    feature_names: Vec<String>,
}

type Datapoint = HashMap<String, Value>;
type Out = String;

impl DatasetGenerator {
    pub fn new(num_features: usize) -> DatasetGenerator {
        let mut feature_names = Vec::new();

        for i in 0..num_features {
            feature_names.push(format!("f_{}", i));
        }

        DatasetGenerator {
            num_features,
            counter: atomic::AtomicI64::new(0),
            feature_names,
        }
    }

    /// Generates the next item in the sequence (iterator-like).
    pub fn next(&self) -> Out {
        let value = self.counter.fetch_add(1, atomic::Ordering::SeqCst);
        self.gen(value)
    }

    /// Generates the ith item in the sequence. DEADLOCKS!!! ///////////////////////////
    pub fn gen(&self, ith: i64) -> Out {
        let mut data = Datapoint::with_capacity(self.num_features);

        for f in 0..self.num_features {
            let name = self.feature_names.get(f).unwrap();
            data.insert(name.to_string(), Value::from(ith));
        }

        serde_json::json!(data).to_string()  // Tried without serialization and still deadlocks!
    }
}

提交死锁代码在这里,如果你想自己尝试 cargo runhttps://github.com/AlbertoEAF/learn-rust/tree/dc5fa867e5a70b605553ef65796fdc9dd42d38a0/rest-injector

Windows 与 Rust 1.60.0 的死锁:

感谢您的帮助!非常感谢:)


更新

我遵循了@kmdreko 在下面的回答中提出的建议,显然问题出在生成器中:并非所有项目都已生成。即使 pool.execute() 被调用 N 次,即使我在离开 producer_thread 之前放置 pool.close(),也只会执行随机数 c < N 的闭包。 为什么会发生这种情况/如何解决?

修复: 原来这个锁定是由 threads_pool 库 (0.2.6) 引起的。我将线程池切换到 rayon 的,第一次尝试时它运行得很顺利。

你应该改变一件事:mpsc::Receiver 将 return 在 .recv() 上出错,如果它不可能通过意识到所有关联的 mpsc::Sender 产生结果下降了,这是一个很好的指标,表明所有工作都已完成。当它们各自的 tasks/threads 完成时,您的 tx_ref 甚至 tx_producer 将被删除,但是您仍然有 tx 的范围可以 理论上 给一个值。这就是给你带来明显僵局的原因。您应该简单地删除 tx_producer 并直接使用 tx,以便将其移入生产者线程并相应地删除。

现在,您会看到所有 N 任务都已完成,或者您会收到一条错误消息,指示某些任务未完成。并非所有任务都完成的原因是因为您正在创建线程池,生成所有任务,然后立即销毁它。闭包结束前的 threads_pool documentation says that the threads will finish their current job when the pool is destroyed, but you want to wait until all jobs have completed. For that you need to call the .close() method provided by the PoolManager 特征。

您看到不一致的行为,但受益于直接 return 字符串的原因是因为作业需要更少的工作,并且线程可以在看到退出信号之前完成所有作业。您的 generator_ref.next() 需要更多的计算,因此他们在看到被告知退出之前只处理 4-plus-a-bit 个作业也就不足为奇了。