使用人造丝并行处理文件

Parallelising file processing using rayon

我的本地源文件夹中有 7 个 CSV 文件(每个 55 MB),我想将其转换为 JSON 格式并存储到本地文件夹中。我的 OS 是 MacOS(四核英特尔 i5)。 基本上,它是一个简单的 Rust 程序,它是 运行 来自控制台的

./target/release/convert <source-folder> <target-folder>

我使用 Rust 线​​程的多线程方法很糟糕

fn main() -> Result<()> {
    let source_dir = PathBuf::from(get_first_arg()?);
    let target_dir = PathBuf::from(get_second_arg()?);

    let paths = get_file_paths(&source_dir)?;

    let mut handles = vec![];
    for source_path in paths {
        let target_path = create_target_file_path(&source_path, &target_dir)?;

        handles.push(thread::spawn(move || {
            let _ = convert(&source_path, &target_path);
        }));
    }

    for h in handles {
        let _ = h.join();
    }

    Ok(())
}

我 运行 它使用 time 来测量 CPU 利用率,这给出了

2.93s user 0.55s system 316% cpu 1.098 total

然后我尝试使用 rayon(线程池)crate 实现相同的任务:

fn main() -> Result<()> {
    let source_dir = PathBuf::from(get_first_arg()?);
    let target_dir = PathBuf::from(get_second_arg()?);

    let paths = get_file_paths(&source_dir)?;
    let pool = rayon::ThreadPoolBuilder::new().num_threads(15).build()?;

    for source_path in paths {
        let target_path = create_target_file_path(&source_path, &target_dir)?;

        pool.install(|| {
            let _ = convert(&source_path, &target_path);
        });
    }

    Ok(())
}

我 运行 它使用 time 来测量 CPU 利用率,这给出了

2.97s user 0.53s system 98% cpu 3.561 total

我在使用人造丝时看不到任何改进。我可能以错误的方式使用人造丝。 有谁知道它有什么问题吗?

更新(4 月 9 日)

经过一段时间与 Rust 检查器的斗争,只是想分享 一个解决方案,也许它可以帮助其他人,或者其他人可以提出更好的建议 approach/solution

pool.scope(move |s| {
        for source_path in paths {
            let target_path = create_target_file_path(&source_path, &target_dir).unwrap();
            s.spawn(move |_s| {
                convert(&source_path, &target_path).unwrap();
            });
        }
    });

但仍然没有击败使用 rust std::thread 处理 113 个文件的方法。

46.72s user 8.30s system 367% cpu 14.955 total

更新(4 月 10 日)

@maxy 评论后

// rayon solution
paths.into_par_iter().for_each(|source_path| {
        let target_path = create_target_file_path(&source_path, &target_dir);

        match target_path {
            Ok(target_path) => {
                info!(
                    "Processing {}",
                    target_path.to_str().unwrap_or("Unable to convert")
                );
                let res = convert(&source_path, &target_path);
                if let Err(e) = res {
                    error!("{}", e);
                }
            }
            Err(e) => error!("{}", e),
        }
    });
    // std::thread solution
    let mut handles = vec![];
    for source_path in paths {
        let target_path = create_target_file_path(&source_path, &target_dir)?;
        handles.push(thread::spawn(move || {
            let _ = convert(&source_path, &target_path);
        }));
    }

    for handle in handles {
        let _ = handle.join();
    }

57个文件比较:

std::threads: 23.71s user 4.19s system 356% cpu 7.835 total
rayon:        23.36s user 4.08s system 324% cpu 8.464 total

rayon install 的文档不是很清楚,但是签名:

pub fn install<OP, R>(&self, op: OP) -> R where
    R: Send,
    OP: FnOnce() -> R + Send, 

说 returns 输入 R。与闭包 returns 相同的类型 R。所以显然install()还要等结果

这仅在闭包产生额外任务时才有意义,例如通过在闭包内使用 .par_iter()。我建议在文件列表上直接使用 rayon 的 parallel iterators(而不是 for 循环)。您甚至不需要创建自己的线程池,默认线程池通常就可以了。

如果您坚持手动操作,则必须使用 spawn() instead of install. And you'll probably have to move your loop into a lambda passed to scope()