将 csv 格式的数据从 ChildStdout 流式传输到文件的最快方法
Fastest method of streaming csv formatted data from ChildStdout to File
我正在寻找加快将数据写入文件的速度的方法,我尝试了几种方法但都没有成功。
本质上这个过程是
- 生成一个子进程,通过 BQ 向 BigQuery 发送请求
CLI API.
- 使用
std::io::copy
将数据流式传输到 MutexGuard<Box<dyn Write>>
。
这是我目前拥有的代码的一个小复制品。这可能不会自行编译,但我认为这足以详细解释我在做什么。
use std::process::{Command, ChildStdout, Child, Stdio};
use std::time::Duration;
use std::io::{BufReader, BufWriter, Read, Write};
use std::sync::MutexGuard;
use std::box::Box;
const CAP: usize = 1024 * 8;
fn main() {
let cmd = "bq --format csv query";
let cmd = cmd.split_whitespace();
let query = "path/to/query.sql"
let (mut child, stdout) = spawn(cmd, query).await?
let save_path = "path/to/save/file.csv"
let file = File::open(save_path).unwrap();
let mut rdr = BufReader::with_capacity(CAP, stdout);
let wtr = Mutex::new(Box::new(BufWriter::with_capacity(CAP, file)));
write(&mut child, &mut rdr, &wtr).unwrap();
}
fn create_cmd(args: Vec<&str>) -> Result<Command> {
let mut v_itr = args.into_iter();
let mut cmd = match v_itr.next() {
Some(s) => Command::new(which(s).unwrap()),
None => return Err("Must supply at least one command argument.".into())
};
while let Some(s) = v_itr.next() {
// This is essentially Vec's "push", but for adding arguments to a shell command.
cmd.arg(s);
};
Ok(cmd)
}
fn spawn(cmd: Vec<u8>, query: &str) -> Result<ChildStdout> {
let cmd = create_cmd(cmd)
let query = std::fs::read_to_string(query).unwrap();
let query = query.trim();
cmd.arg(query);
let mut child = match cmd
.stdout(Stdio::piped())
.spawn()
{
Ok(c) => c,
Err(e) => return Err(e.into()),
};
// Sleep while waiting for the child process to fully start,
// otherwise we may get an error when getting a handle to stdout.
let sleep_time = Duration::new(2, 0);
sleep(sleep_time);
Ok(child, stdout)
}
fn write<R: Read, W: Write>(child: &mut Child, rdr: &mut R, wtr: Mutex<Box<W>>) -> Result<()> {
let mut wtr_lock = wtr.lock().unwrap();
let mut wtr_ref = &mut *wtr_lock;
while let Ok(None) = child.try_wait() {
std::io::copy(rdr, wtr_ref).unwrap();
}
Ok(())
}
到目前为止我已经尝试过:
- 使用标准 BufWriter 和 BufReader 的 tokio 等价物
- 不使用任何缓冲输入
- 使用 LineWriter 进行输出
- 使用无缓冲读取的缓冲写入
- 从
std
实现异步等效于 stack_buffer_copy
但使用 Tokio(这非常慢 - 我基本上逐字逐句地复制了 std 中的内容,然后为包装的结构实现了 BufMut ReadBuf)
我正在编写的数据大小实际上可以达到几 GiB,但我不担心低于 20 MiB 的任何数据的速度。目前,我的速度略高于 1 MiB/s,这比我想要的要慢得多。我不介意解决方案是否复杂,我只是想看看有什么可能(我这样做主要是作为一个学习机会)。
附带说明一下,BQ 的响应通常只需要 5~10 秒(BQ 命令实际上会向 stderr 打印类似 Waiting on bqjob_**** ... (10s) Current status: DONE
的内容),所以我知道这不是在这种情况下的瓶颈。我使用的计算机是 Mac M1 Air,内存为 16GiB。我正在使用以下命令进行编译:RUSTFLAGS="--emit=asm" cargo build --release
.
如果你不对数据做任何处理,我会尽量避免管道和复制数据,让子进程直接写入那个文件。
所以不是这个:
cmd.stdout(Stdio::piped())
你可以通过你的 file
:
cmd.stdout(file)
这应该有效,因为 Stdio
实现了 From<File>
。有一个这样的例子 here(尽管使用的是标准输入,而不是标准输出)。
要测量您的基线速度,您可以运行来自终端的命令:
$ bq ... > path/to/save/file.csv
然后你就会知道在速度方面会发生什么。
P.S。如果您好奇为什么您的程序运行缓慢,请尝试使用 flamegraph.
我正在寻找加快将数据写入文件的速度的方法,我尝试了几种方法但都没有成功。 本质上这个过程是
- 生成一个子进程,通过 BQ 向 BigQuery 发送请求 CLI API.
- 使用
std::io::copy
将数据流式传输到MutexGuard<Box<dyn Write>>
。
这是我目前拥有的代码的一个小复制品。这可能不会自行编译,但我认为这足以详细解释我在做什么。
use std::process::{Command, ChildStdout, Child, Stdio};
use std::time::Duration;
use std::io::{BufReader, BufWriter, Read, Write};
use std::sync::MutexGuard;
use std::box::Box;
const CAP: usize = 1024 * 8;
fn main() {
let cmd = "bq --format csv query";
let cmd = cmd.split_whitespace();
let query = "path/to/query.sql"
let (mut child, stdout) = spawn(cmd, query).await?
let save_path = "path/to/save/file.csv"
let file = File::open(save_path).unwrap();
let mut rdr = BufReader::with_capacity(CAP, stdout);
let wtr = Mutex::new(Box::new(BufWriter::with_capacity(CAP, file)));
write(&mut child, &mut rdr, &wtr).unwrap();
}
fn create_cmd(args: Vec<&str>) -> Result<Command> {
let mut v_itr = args.into_iter();
let mut cmd = match v_itr.next() {
Some(s) => Command::new(which(s).unwrap()),
None => return Err("Must supply at least one command argument.".into())
};
while let Some(s) = v_itr.next() {
// This is essentially Vec's "push", but for adding arguments to a shell command.
cmd.arg(s);
};
Ok(cmd)
}
fn spawn(cmd: Vec<u8>, query: &str) -> Result<ChildStdout> {
let cmd = create_cmd(cmd)
let query = std::fs::read_to_string(query).unwrap();
let query = query.trim();
cmd.arg(query);
let mut child = match cmd
.stdout(Stdio::piped())
.spawn()
{
Ok(c) => c,
Err(e) => return Err(e.into()),
};
// Sleep while waiting for the child process to fully start,
// otherwise we may get an error when getting a handle to stdout.
let sleep_time = Duration::new(2, 0);
sleep(sleep_time);
Ok(child, stdout)
}
fn write<R: Read, W: Write>(child: &mut Child, rdr: &mut R, wtr: Mutex<Box<W>>) -> Result<()> {
let mut wtr_lock = wtr.lock().unwrap();
let mut wtr_ref = &mut *wtr_lock;
while let Ok(None) = child.try_wait() {
std::io::copy(rdr, wtr_ref).unwrap();
}
Ok(())
}
到目前为止我已经尝试过:
- 使用标准 BufWriter 和 BufReader 的 tokio 等价物
- 不使用任何缓冲输入
- 使用 LineWriter 进行输出
- 使用无缓冲读取的缓冲写入
- 从
std
实现异步等效于stack_buffer_copy
但使用 Tokio(这非常慢 - 我基本上逐字逐句地复制了 std 中的内容,然后为包装的结构实现了 BufMut ReadBuf)
我正在编写的数据大小实际上可以达到几 GiB,但我不担心低于 20 MiB 的任何数据的速度。目前,我的速度略高于 1 MiB/s,这比我想要的要慢得多。我不介意解决方案是否复杂,我只是想看看有什么可能(我这样做主要是作为一个学习机会)。
附带说明一下,BQ 的响应通常只需要 5~10 秒(BQ 命令实际上会向 stderr 打印类似 Waiting on bqjob_**** ... (10s) Current status: DONE
的内容),所以我知道这不是在这种情况下的瓶颈。我使用的计算机是 Mac M1 Air,内存为 16GiB。我正在使用以下命令进行编译:RUSTFLAGS="--emit=asm" cargo build --release
.
如果你不对数据做任何处理,我会尽量避免管道和复制数据,让子进程直接写入那个文件。
所以不是这个:
cmd.stdout(Stdio::piped())
你可以通过你的 file
:
cmd.stdout(file)
这应该有效,因为 Stdio
实现了 From<File>
。有一个这样的例子 here(尽管使用的是标准输入,而不是标准输出)。
要测量您的基线速度,您可以运行来自终端的命令:
$ bq ... > path/to/save/file.csv
然后你就会知道在速度方面会发生什么。
P.S。如果您好奇为什么您的程序运行缓慢,请尝试使用 flamegraph.