将 csv 格式的数据从 ChildStdout 流式传输到文件的最快方法

Fastest method of streaming csv formatted data from ChildStdout to File

我正在寻找加快将数据写入文件的速度的方法,我尝试了几种方法但都没有成功。 本质上这个过程是

这是我目前拥有的代码的一个小复制品。这可能不会自行编译,但我认为这足以详细解释我在做什么。

use std::process::{Command, ChildStdout, Child, Stdio};
use std::time::Duration;
use std::io::{BufReader, BufWriter, Read, Write};
use std::sync::MutexGuard;
use std::box::Box;

const CAP: usize = 1024 * 8;

fn main() {
    let cmd = "bq --format csv query";
    let cmd = cmd.split_whitespace();
    let query = "path/to/query.sql"
    let (mut child, stdout) = spawn(cmd, query).await?

    let save_path = "path/to/save/file.csv"
    let file = File::open(save_path).unwrap();

    let mut rdr = BufReader::with_capacity(CAP, stdout);
    let wtr = Mutex::new(Box::new(BufWriter::with_capacity(CAP, file)));
    

    write(&mut child, &mut rdr, &wtr).unwrap();

}

fn create_cmd(args: Vec<&str>) -> Result<Command> {
    let mut v_itr = args.into_iter();
    let mut cmd = match v_itr.next() { 
        Some(s) => Command::new(which(s).unwrap()),
        None => return Err("Must supply at least one command argument.".into())
    };
    
    while let Some(s) = v_itr.next() {
        // This is essentially Vec's "push", but for adding arguments to a shell command.
        cmd.arg(s);
    };

    Ok(cmd)
}

fn spawn(cmd: Vec<u8>, query: &str) -> Result<ChildStdout> {
    let cmd = create_cmd(cmd)

    let query = std::fs::read_to_string(query).unwrap();
    let query = query.trim();
    cmd.arg(query);

    let mut child = match cmd
            .stdout(Stdio::piped())
            .spawn()
    {
        Ok(c) => c,
        Err(e) => return Err(e.into()),
    };

    // Sleep while waiting for the child process to fully start,
    // otherwise we may get an error when getting a handle to stdout.
    let sleep_time = Duration::new(2, 0);
    sleep(sleep_time);

    Ok(child, stdout)
}

fn write<R: Read, W: Write>(child: &mut Child, rdr: &mut R, wtr: Mutex<Box<W>>) -> Result<()> {
    let mut wtr_lock = wtr.lock().unwrap();
    let mut wtr_ref = &mut *wtr_lock;

    while let Ok(None) = child.try_wait() {
        std::io::copy(rdr, wtr_ref).unwrap();
    }

    Ok(())
}

到目前为止我已经尝试过:

我正在编写的数据大小实际上可以达到几 GiB,但我不担心低于 20 MiB 的任何数据的速度。目前,我的速度略高于 1 MiB/s,这比我想要的要慢得多。我不介意解决方案是否复杂,我只是想看看有什么可能(我这样做主要是作为一个学习机会)。

附带说明一下,BQ 的响应通常只需要 5~10 秒(BQ 命令实际上会向 stderr 打印类似 Waiting on bqjob_**** ... (10s) Current status: DONE 的内容),所以我知道这不是在这种情况下的瓶颈。我使用的计算机是 Mac M1 Air,内存为 16GiB。我正在使用以下命令进行编译:RUSTFLAGS="--emit=asm" cargo build --release.

如果你不对数据做任何处理,我会尽量避免管道和复制数据,让子进程直接写入那个文件。

所以不是这个:

cmd.stdout(Stdio::piped())

你可以通过你的 file:

cmd.stdout(file)

这应该有效,因为 Stdio 实现了 From<File>。有一个这样的例子 here(尽管使用的是标准输入,而不是标准输出)。

要测量您的基线速度,您可以运行来自终端的命令:

$ bq ... > path/to/save/file.csv

然后你就会知道在速度方面会发生什么。

P.S。如果您好奇为什么您的程序运行缓慢,请尝试使用 flamegraph.