当使用来自“fork”创建的多个 C 线程的回调函数时,Rust Mutex 不起作用

Rust Mutex is not working when using a callback function from multiple C threads created by `fork`

我正在使用 C 库 Cuba,它使用从 C 中创建的多个线程调用的回调函数。Cuba 并行化基于 fork/wait POSIX函数而不是 pthreads (arxiv.org/abs/1408.6373)。它在 core 参数中给出当前线程。

我正在尝试将此回调函数的结果记录到屏幕和文件中。如果我使用 println!,我会得到预期的输出,但是如果我使用 slog,当我使用 Mutex 时,输出会被破坏。如果我使用 async drain,我根本没有输出。

Mutex 是不是因为看不到函数实际上是从另一个线程调用的,所以没有锁定?我试图用 Rust 线​​程重新创建问题,但没有成功。我最好让 async 排水管工作。

下面是一个给出问题行为的示例程序。回调获取 vegas 函数的最后一个参数作为其参数之一。这是记录器克隆的载体。这样,每个核心都应该有自己的记录器副本:

#[macro_use]
extern crate slog;
extern crate cuba;
extern crate slog_term;

use slog::Drain;

// this function is called from different c threads
// `core` indicates which thread
fn integrand(
    _x: &[f64],
    _f: &mut [f64],
    loggers: &mut Vec<slog::Logger>,
    _nvec: usize,
    core: i32,
) -> Result<(), &'static str> {
    info!(loggers[core as usize], "A\nB\nC");

    Ok(())
}

fn main() {
    let decorator = slog_term::TermDecorator::new().build();
    let drain = slog_term::CompactFormat::new(decorator).build();
    let drain = std::sync::Mutex::new(drain).fuse();

    let log = slog::Logger::root(drain, o!());

    let mut integrator = cuba::CubaIntegrator::new(integrand);
    integrator.set_cores(10, 1000); // set 10 cores

    integrator.vegas(
        1,
        1,
        cuba::CubaVerbosity::Progress,
        0,
        vec![log.clone(); 11],
    );
}

输出:

C 
INFO Mar 26A
B
C 10:27
:42.195 MarINFO 26  10:27A
B
C:42.195
 MarINFO 26  10:27A
B
C:42.195
 INFO A
B
C
Mar 26 10:27:42.196 INFO A
B
C
Mar 26 10:27:42.196 INFO A
B
C

古巴 C 库 has this to say:

Windows users: Cuba 3 and up uses fork(2) to parallelize the execution threads. This POSIX function is not part of the Windows API, however, and is furthermore used in an essential way such that it cannot be worked around simply with CreateProcess etc. The only feasible emulation seems to be available through Cygwin.

这是代码的复制品。我们 fork 然后 child 和 parent 尝试在打印内容时保持互斥锁。插入 sleep 以鼓励 OS 调度程序尝试其他线程:

use nix::unistd::{fork, ForkResult}; // 0.13.0
use std::{sync::Mutex, thread, time::Duration};

fn main() {
    let shared = Mutex::new(10);

    match fork() {
        Ok(ForkResult::Parent { .. }) => {
            let max = shared.lock().unwrap();
            for _ in 0..*max {
                println!("Parent");
                thread::sleep(Duration::from_millis(10));
            }
        }
        Ok(ForkResult::Child) => {
            let max = shared.lock().unwrap();
            for _ in 0..*max {
                println!("Child");
                thread::sleep(Duration::from_millis(10));
            }
        }
        Err(e) => {
            eprintln!("Error: {}", e);
        }
    }
}
$ cargo run

Parent
Child
Parent
Child
Child
Parent
Child
Parent
Child
Parent
Child
Parent
Child
Parent
Child
Parent
Child
Parent
Child
Parent

对线程使用 fork 确实 处理起来很痛苦;我清楚地记得以前曾寻找过与此相关的可怕问题。我发现有两个深入的资源:

后者说(强调我的):

Can I create mutex before fork-ing?

Yes - however the child and parent process will not share virtual memory and each one will have a mutex independent of the other.

(Advanced note: There are advanced options using shared memory that allow a child and parent to share a mutex if it's created with the correct options and uses a shared memory segment. See procs, fork(), and mutexes)


If I use the async drain I get no output at all.

另请参阅:


不会信任 Cuba Rust 库。主要有两点:

  1. 如果正在创建线程,用户数据泛型类型应该绑定 SyncSend,仅限于可以安全共享的类型/ 在线程之间传输数据。

  2. 传递给 integrand 函数的用户数据应该 而不是 &mut。 Rust 的一个基本概念是任何时候任何数据都只能有一个可变引用。古巴轻而易举地让你规避了这一点。

这是 Cuba Rust 和 C 库的尝试复制:

#[macro_use]
extern crate slog;

use slog::Drain;

fn integrand(loggers: &mut Vec<slog::Logger>, core: i32) {
    info!(loggers[core as usize], "A\nB\nC\n{}", core);
}

fn main() {
    let decorator = slog_term::TermDecorator::new().build();
    let drain = slog_term::CompactFormat::new(decorator).build();
    let drain = std::sync::Mutex::new(drain).fuse();

    let log = slog::Logger::root(drain, o!());

    let logs = vec![log.clone(); 11];

    cuba_repro(logs, integrand);
}

use std::{ffi::c_void, thread};

type Integrand<T> = fn(&mut T, i32);

fn cuba_repro<T>(mut user_data: T, mut integrand: Integrand<T>) {
    // From the `vegas` method
    let user_data_ptr = &mut user_data as *mut _ as *mut c_void;
    let integrand_ptr = &mut integrand as *mut _ as *mut c_void;

    unsafe { cuba_repro_ffi::<T>(user_data_ptr, integrand_ptr) }
}

unsafe fn cuba_repro_ffi<T>(user_data: *const c_void, integrand: *const c_void) {
    let user_data = FfiDoesNotCareAboutSendOrSync(user_data);
    let integrand = FfiDoesNotCareAboutSendOrSync(integrand);

    let threads: Vec<_> = (0..4).map(move |i| {
        thread::spawn(move || {
            // C doesn't care about this pedantry
            let user_data = &mut *(user_data.0 as *const T as *mut T);
            let integrand = &mut *(integrand.0 as *const Integrand<T> as *mut Integrand<T>);

            // From the `c_integrand` function
            let k: &mut T = &mut *(user_data as *mut _);
            let _ignored = integrand(k, i);
        })
    }).collect();

    for t in threads { t.join().unwrap() }
}

#[derive(Copy, Clone)]
struct FfiDoesNotCareAboutSendOrSync<T>(T);
unsafe impl<T> Send for FfiDoesNotCareAboutSendOrSync<T> {}
unsafe impl<T> Sync for FfiDoesNotCareAboutSendOrSync<T> {}

我不得不进行 大量 更改以使 Rust 编译器忽略古巴图书馆和相关 FFI 正在执行的大量不安全和 rule-breaking。

此代码示例确实实际上按顺序分别打印出 4 条日志语句,因此这不是一个完整的答案。但是,我相当确定古巴图书馆正在触发未定义的行为,这意味着任何结果都是可能的,包括 apparently working.