使用人造丝并行处理文件
Parallelising file processing using rayon
我的本地源文件夹中有 7 个 CSV 文件(每个 55 MB),我想将其转换为 JSON 格式并存储到本地文件夹中。我的 OS 是 MacOS(四核英特尔 i5)。
基本上,它是一个简单的 Rust 程序,它是 运行 来自控制台的
./target/release/convert <source-folder> <target-folder>
我使用 Rust 线程的多线程方法很糟糕
fn main() -> Result<()> {
let source_dir = PathBuf::from(get_first_arg()?);
let target_dir = PathBuf::from(get_second_arg()?);
let paths = get_file_paths(&source_dir)?;
let mut handles = vec![];
for source_path in paths {
let target_path = create_target_file_path(&source_path, &target_dir)?;
handles.push(thread::spawn(move || {
let _ = convert(&source_path, &target_path);
}));
}
for h in handles {
let _ = h.join();
}
Ok(())
}
我 运行 它使用 time
来测量 CPU 利用率,这给出了
2.93s user 0.55s system 316% cpu 1.098 total
然后我尝试使用 rayon
(线程池)crate 实现相同的任务:
fn main() -> Result<()> {
let source_dir = PathBuf::from(get_first_arg()?);
let target_dir = PathBuf::from(get_second_arg()?);
let paths = get_file_paths(&source_dir)?;
let pool = rayon::ThreadPoolBuilder::new().num_threads(15).build()?;
for source_path in paths {
let target_path = create_target_file_path(&source_path, &target_dir)?;
pool.install(|| {
let _ = convert(&source_path, &target_path);
});
}
Ok(())
}
我 运行 它使用 time
来测量 CPU 利用率,这给出了
2.97s user 0.53s system 98% cpu 3.561 total
我在使用人造丝时看不到任何改进。我可能以错误的方式使用人造丝。
有谁知道它有什么问题吗?
更新(4 月 9 日)
经过一段时间与 Rust 检查器的斗争,只是想分享 一个解决方案,也许它可以帮助其他人,或者其他人可以提出更好的建议 approach/solution
pool.scope(move |s| {
for source_path in paths {
let target_path = create_target_file_path(&source_path, &target_dir).unwrap();
s.spawn(move |_s| {
convert(&source_path, &target_path).unwrap();
});
}
});
但仍然没有击败使用 rust std::thread
处理 113 个文件的方法。
46.72s user 8.30s system 367% cpu 14.955 total
更新(4 月 10 日)
@maxy 评论后
// rayon solution
paths.into_par_iter().for_each(|source_path| {
let target_path = create_target_file_path(&source_path, &target_dir);
match target_path {
Ok(target_path) => {
info!(
"Processing {}",
target_path.to_str().unwrap_or("Unable to convert")
);
let res = convert(&source_path, &target_path);
if let Err(e) = res {
error!("{}", e);
}
}
Err(e) => error!("{}", e),
}
});
// std::thread solution
let mut handles = vec![];
for source_path in paths {
let target_path = create_target_file_path(&source_path, &target_dir)?;
handles.push(thread::spawn(move || {
let _ = convert(&source_path, &target_path);
}));
}
for handle in handles {
let _ = handle.join();
}
57个文件比较:
std::threads: 23.71s user 4.19s system 356% cpu 7.835 total
rayon: 23.36s user 4.08s system 324% cpu 8.464 total
rayon install 的文档不是很清楚,但是签名:
pub fn install<OP, R>(&self, op: OP) -> R where
R: Send,
OP: FnOnce() -> R + Send,
说 returns 输入 R
。与闭包 returns 相同的类型 R
。所以显然install()
还要等结果
这仅在闭包产生额外任务时才有意义,例如通过在闭包内使用 .par_iter()
。我建议在文件列表上直接使用 rayon 的 parallel iterators(而不是 for
循环)。您甚至不需要创建自己的线程池,默认线程池通常就可以了。
如果您坚持手动操作,则必须使用 spawn() instead of install
. And you'll probably have to move your loop into a lambda passed to scope()。
我的本地源文件夹中有 7 个 CSV 文件(每个 55 MB),我想将其转换为 JSON 格式并存储到本地文件夹中。我的 OS 是 MacOS(四核英特尔 i5)。 基本上,它是一个简单的 Rust 程序,它是 运行 来自控制台的
./target/release/convert <source-folder> <target-folder>
我使用 Rust 线程的多线程方法很糟糕
fn main() -> Result<()> {
let source_dir = PathBuf::from(get_first_arg()?);
let target_dir = PathBuf::from(get_second_arg()?);
let paths = get_file_paths(&source_dir)?;
let mut handles = vec![];
for source_path in paths {
let target_path = create_target_file_path(&source_path, &target_dir)?;
handles.push(thread::spawn(move || {
let _ = convert(&source_path, &target_path);
}));
}
for h in handles {
let _ = h.join();
}
Ok(())
}
我 运行 它使用 time
来测量 CPU 利用率,这给出了
2.93s user 0.55s system 316% cpu 1.098 total
然后我尝试使用 rayon
(线程池)crate 实现相同的任务:
fn main() -> Result<()> {
let source_dir = PathBuf::from(get_first_arg()?);
let target_dir = PathBuf::from(get_second_arg()?);
let paths = get_file_paths(&source_dir)?;
let pool = rayon::ThreadPoolBuilder::new().num_threads(15).build()?;
for source_path in paths {
let target_path = create_target_file_path(&source_path, &target_dir)?;
pool.install(|| {
let _ = convert(&source_path, &target_path);
});
}
Ok(())
}
我 运行 它使用 time
来测量 CPU 利用率,这给出了
2.97s user 0.53s system 98% cpu 3.561 total
我在使用人造丝时看不到任何改进。我可能以错误的方式使用人造丝。 有谁知道它有什么问题吗?
更新(4 月 9 日)
经过一段时间与 Rust 检查器的斗争,只是想分享 一个解决方案,也许它可以帮助其他人,或者其他人可以提出更好的建议 approach/solution
pool.scope(move |s| {
for source_path in paths {
let target_path = create_target_file_path(&source_path, &target_dir).unwrap();
s.spawn(move |_s| {
convert(&source_path, &target_path).unwrap();
});
}
});
但仍然没有击败使用 rust std::thread
处理 113 个文件的方法。
46.72s user 8.30s system 367% cpu 14.955 total
更新(4 月 10 日)
@maxy 评论后
// rayon solution
paths.into_par_iter().for_each(|source_path| {
let target_path = create_target_file_path(&source_path, &target_dir);
match target_path {
Ok(target_path) => {
info!(
"Processing {}",
target_path.to_str().unwrap_or("Unable to convert")
);
let res = convert(&source_path, &target_path);
if let Err(e) = res {
error!("{}", e);
}
}
Err(e) => error!("{}", e),
}
});
// std::thread solution
let mut handles = vec![];
for source_path in paths {
let target_path = create_target_file_path(&source_path, &target_dir)?;
handles.push(thread::spawn(move || {
let _ = convert(&source_path, &target_path);
}));
}
for handle in handles {
let _ = handle.join();
}
57个文件比较:
std::threads: 23.71s user 4.19s system 356% cpu 7.835 total
rayon: 23.36s user 4.08s system 324% cpu 8.464 total
rayon install 的文档不是很清楚,但是签名:
pub fn install<OP, R>(&self, op: OP) -> R where
R: Send,
OP: FnOnce() -> R + Send,
说 returns 输入 R
。与闭包 returns 相同的类型 R
。所以显然install()
还要等结果
这仅在闭包产生额外任务时才有意义,例如通过在闭包内使用 .par_iter()
。我建议在文件列表上直接使用 rayon 的 parallel iterators(而不是 for
循环)。您甚至不需要创建自己的线程池,默认线程池通常就可以了。
如果您坚持手动操作,则必须使用 spawn() instead of install
. And you'll probably have to move your loop into a lambda passed to scope()。