如何在 self 中使用方法线程使用函数?

How can I thread using functions using method in self?

在尝试编写一些并行代码时,我不断遇到如下所示的错误。我找到了一些解决方案,但它们都涉及锁定,我不想这样做。有什么办法可以解决这个问题吗?

pub trait PermBrute {
    fn quadgram( &self, max_len: usize, ciphertext: &String ) -> Vec<usize> {
        let mut vec : Vec<(f64, Vec<usize>)> = Vec::new();
        let results = Arc::new(Mutex::new(vec));
        let mut threads = vec![];

        for i in 0..*CPUS {
            threads.push( thread::spawn({
                let clone = Arc::clone(&results);
                let text = ciphertext.clone();
                move || {
                    // some code here
                    let hold = self.decrypt( )
                    // some more code here

                    let mut v = clone.lock().unwrap();
                    v.push(best_key);
                }
            }));
        }

        for t in threads {
            t.join().unwrap();
        }

        let lock = Arc::try_unwrap(results).expect("Lock still has multiple owners");
        let mut hold = lock.into_inner().expect("Mutex cannot be locked");

        // do some stuff with hold and return out

        return out;
    }

    fn decrypt( &self, ciphertext : &String, key : &Vec<usize>) -> String;
}
error[E0277]: `Self` cannot be shared between threads safely
   --> src/ciphers/cipher.rs:131:27
    |
108 |     fn quadgram( &self, max_len: usize, ciphertext: &String ) -> Vec<usize> {
    |                                                                            - help: consider further restricting `Self`: `where Self: std::marker::Sync`
...
131 |             threads.push( thread::spawn({
    |                           ^^^^^^^^^^^^^ `Self` cannot be shared between threads safely
    |
    = help: the trait `std::marker::Sync` is not implemented for `Self`
    = note: required because of the requirements on the impl of `std::marker::Send` for `&Self`
    = note: required because it appears within the type `[closure@src/ciphers/cipher.rs:140:17: 164:18 text:std::string::String, start_len:usize, end_len:usize, count:usize, start_points_clone:std::vec::Vec<usize>, local_self:&Self, end:usize, clone:std::sync::Arc<std::sync::Mutex<std::vec::Vec<(f64, std::vec::Vec<usize>)>>>]`

使用 rayon crate,这可以使用并行迭代器技术来完成。

pub trait PermBrute {
    fn quadgram(&self, max_len: usize, ciphertext: &String) -> Vec<usize> {
        let mut vec: Vec<(f64, Vec<usize>)> = Vec::new();
        let mut threads = vec![];
        
        let best_keys: Vec<_> = (0..*CPUS)
            .into_par_iter()
            .map(|i| {
                // some code here
                // you can access `ciphertext` here directly without copying
                todo!();
                // some more code here
                
                best_key
            })
            .collect();

        // do some stuff with best_keys and return out

        return out;
    }
    fn decrypt(&self, ciphertext: &String, key: &Vec<usize>) -> String;
}

我花了一些时间修改你的代码,以便我可以在 Rust 操场上测试它。这是修改后的源代码:

use std::sync::{Arc, Mutex};
use std::thread;

pub trait PermBrute {
    fn quadgram( &self, max_len: usize, ciphertext: &String ) -> Vec<usize> {
        let mut vec : Vec<(f64, Vec<usize>)> = Vec::new();
        let results = Arc::new(Mutex::new(vec));
        let mut threads = vec![];

        for i in 0..10 {
            threads.push( thread::spawn({
                let clone = Arc::clone(&results);
                let text = ciphertext.clone();
                move || {
                    // some code here
                    let hold = self.decrypt( &String::new(), &vec![] );
                    // some more code here

                    let mut v = clone.lock().unwrap();
                    // v.push(best_key);
                }
            }));
        }

        for t in threads {
            t.join().unwrap();
        }

        let lock = Arc::try_unwrap(results).expect("Lock still has multiple owners");
        let mut hold = lock.into_inner().expect("Mutex cannot be locked");

        // do some stuff with hold and return out

        // return out;
        unimplemented!()
    }

    fn decrypt(&self, ciphertext: &String, key: &Vec<usize>) -> String;
}

首先,您可以通过以下方式限制 Self

pub trait PermBrute: Sync {}

然后,rustc开始为生命周期烦恼:

(报错太长,我现在用playground)

应该可以回答您的问题。总之,thread是刷出来的背景,rustc还是傻,不把你的join放在眼里。有 Arc<Self>AtomicPtr<Self>.

等解决方法

更新

让我们从一个最小的例子开始:

use std::thread;

fn try_to_spawn() {
    let x: String = "5".to_string();
    let j = thread::spawn(|| {
        println!("{}", x.len());
    });
    j.join().unwrap();
}

在这里,rustc 说:

error[E0373]: closure may outlive the current function, but it borrows `x`, which is owned by the current function
 --> src/lib.rs:5:27
  |
5 |     let j = thread::spawn(|| {
  |                           ^^ may outlive borrowed value `x`
6 |         println!("{}", x.len());
  |                        - `x` is borrowed here
  |

help: to force the closure to take ownership of `x` (and any other referenced variables), use the `move` keyword
  |
5 |     let j = thread::spawn(move || {
  |                           ^^^^

这里rustc抱怨借用的生命周期xrustc 认为:由于一个线程被生成并且将成为 运行 后台,它可能在函数 try_to_spawn 退出之前或之后终止,所以 x 可能在 x.len() 被执行。

但是很明显,我们join在函数的末尾编辑了线程,我们的x肯定活得足够长(当然,'static生命时间不是必需的,在人的观点)。但是,rustc还是太笨了,不懂人类,对我们的join!.

一无所知。

可以将x移入闭包,而不是借用它。但是,以后将无法使用 x。要以“安全”方式解决问题,请使用 Arc<String>:

use std::thread;
use std::sync::Arc;

fn try_to_spawn() {
    let x: Arc<String> = Arc::new("5".to_string());
    let x_clone: Arc<String> = x.clone();
    let j = thread::spawn(move || {
        println!("{}", x_clone.len());
    });
    j.join().unwrap();
    println!("{}", x.len());
}

但是 Arc 有开销。人们可能想使用指针 *const String*mut String 以避免生命周期检查——原始指针不是 Send/Sync 并且不能转移到 thread, 然而。要通过线程之间的指针共享资源,您必须使用 AtomicPtrhere 是关于制作指向 Send + Sync 的原始指针的讨论)。


回到问题,你的self(类型&Self)呢?当然,也是参考!而rustc也没有弄清楚它的“真实寿命”:

use std::thread;
use std::sync::Arc;

struct S;

impl S {
    fn try_to_spawn(&self) {
        let j = thread::spawn(|| {
            self.do_something();
        });
        j.join().unwrap();
    }
    
    fn do_something(&self) {}
}

产生错误信息:

error[E0495]: cannot infer an appropriate lifetime due to conflicting requirements
  --> src/lib.rs:8:31
   |
8  |           let j = thread::spawn(|| {
   |  _______________________________^
9  | |             self.do_something();
10 | |         });
   | |_________^
   |

这个错误看起来不像以前的生命周期错误,但更类似于您的代码中发生的错误。要再次解决此问题,您可以使用 Arc<Self>:

fn try_to_spawn(self: Arc<Self>) {
    let j = thread::spawn(move || {
        self.do_something();
    });
    j.join().unwrap();
}

或使用AtomicPtr<Self>:

use std::thread;
use std::sync::atomic::AtomicPtr;
use std::sync::atomic::Ordering::Relaxed;

struct S;

impl S {
    fn try_to_spawn(&self) {
        let self_ptr: AtomicPtr<Self> 
            = AtomicPtr::new(self as *const Self as *mut Self);
        let j = thread::spawn(move || {
            unsafe {
                self_ptr.load(Relaxed) // *mut Self
                    .as_ref()          // Option<&Self>
                    .unwrap()          // &Self
                    .do_something();
            }
        });
        j.join().unwrap();
    }
    
    fn do_something(&self) {}
}

这有效但丑陋。我还建议使用像 rayon 这样的 crate 来执行并行计算。但是,对于您想手动创建 thread 的情况,我仍然希望这个答案对您有所帮助。