如何在下载过程中解压缩和解压缩 tar.gz 存档?
How decompress and unpack tar.gz archive in download process?
我需要在下载过程中解压缩和解压大的 .tar.gz 文件(例如 ~5Gb),而不在磁盘上保存存档文件。
我使用 reqwest crate 下载文件,flate2 crate 解压,tar crate 解包。我尝试用 tar.gz 格式来做。但是可以使用 zip 和 tar.bz2 格式。 (哪个更容易使用?)
看起来我成功实现了这个,但没想到解包以错误结束:
thread 'main' panicked at 'Cannot unpack archive: Custom { kind: UnexpectedEof, error: TarError { desc: "failed to unpack `/home/ruut/Projects/GreatWar/launcher/gamedata/gamedata-master/.vscode/settings.json`", io: Custom { kind: UnexpectedEof, error: TarError { desc: "failed to unpack `gamedata-master/.vscode/settings.json` into `/home/ruut/Projects/GreatWar/launcher/gamedata/gamedata-master/.vscode/settings.json`", io: Kind(UnexpectedEof) } } } }', /home/ruut/Projects/GreatWar/launcher/src/gitlab.rs:87:38
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
我的代码:
let full_url = format!("{}/{}/{}", HOST, repo_info.url, repo_info.download_url);
let mut response;
match self.client.get(&full_url).send().await {
Ok(res) => response = res,
Err(error) => {
return Err(Error::new(ErrorKind::InvalidData, error));
}
};
if response.status() == reqwest::StatusCode::OK {
let mut stream = response.bytes_stream();
while let Some(item) = stream.next().await {
let chunk = item
.or(Err(format!("Error while downloading file")))
.unwrap();
let b: &[u8] = &chunk.to_vec();
let gz = GzDecoder::new(b);
let mut archive = Archive::new(gz);
archive.unpack("./gamedata").expect("Cannot unpack archive");
}
}
第一次获取块后archive.unpack
抛出错误。
我做错了什么?
Rust 具有 std::io::BufRead
特性。它管理一个内部缓冲区,可以被填充和使用,这使得它非常适合在没有中间 collect
s 的情况下传递数据。
由于 reqwest.Response
实现了 Read
,我们可以将它变成 BufReader
并传递给 flate2::bufread::GzDecoder
。
你的问题是 GzDecoder::new
期望传递给它的所有内容都是完整的存档,但除了第一个块之外的任何内容显然都不是。
最小示例(为简单起见,使用阻塞 API):
use flate2::bufread::GzDecoder;
use std::io::BufReader;
use tar::Archive;
fn main() {
let resp = reqwest::blocking::get(URL).unwrap();
let content_br = BufReader::new(resp);
let tarfile = GzDecoder::new(content_br);
let mut archive = Archive::new(tarfile);
archive.unpack("./gamedata").unwrap();
}
kmdreko 的评论解释了为什么您的代码失败 - .next()
returns 只有第一个块,您必须将所有块提供给 gzip reader。另一个答案显示了如何使用阻塞 reqwest
API.
如果你想继续使用非阻塞API,那么你可以在一个单独的线程中启动解码器,并通过一个通道向它提供数据。例如,您可以使用 flume 个同时支持同步和异步接口的频道。您还需要按照 GzDecoder
的预期将频道转换为 Read
的内容。例如(编译,但未经测试):
use std::io::{self, Read};
use flate2::read::GzDecoder;
use futures_lite::StreamExt;
use tar::Archive;
async fn download() -> io::Result<()> {
let client = reqwest::Client::new();
let full_url = "...";
let response;
match client.get(full_url).send().await {
Ok(res) => response = res,
Err(error) => {
return Err(io::Error::new(io::ErrorKind::InvalidData, error));
}
};
let (tx, rx) = flume::bounded(0);
let decoder_thread = std::thread::spawn(move || {
let input = ChannelRead::new(rx);
let gz = GzDecoder::new(input);
let mut archive = Archive::new(gz);
archive.unpack("./gamedata").unwrap();
});
if response.status() == reqwest::StatusCode::OK {
let mut stream = response.bytes_stream();
while let Some(item) = stream.next().await {
let chunk = item
.or(Err(format!("Error while downloading file")))
.unwrap();
tx.send_async(chunk.to_vec()).await.unwrap();
}
drop(tx); // close the channel to signal EOF
}
tokio::task::spawn_blocking(|| decoder_thread.join())
.await
.unwrap()
.unwrap();
Ok(())
}
// Wrap a channel into something that impls `io::Read`
struct ChannelRead {
rx: flume::Receiver<Vec<u8>>,
current: io::Cursor<Vec<u8>>,
}
impl ChannelRead {
fn new(rx: flume::Receiver<Vec<u8>>) -> ChannelRead {
ChannelRead {
rx,
current: io::Cursor::new(vec![]),
}
}
}
impl Read for ChannelRead {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
if self.current.position() == self.current.get_ref().len() as u64 {
// We've exhausted the previous chunk, get a new one.
if let Ok(vec) = self.rx.recv() {
self.current = io::Cursor::new(vec);
}
// If recv() "fails", it means the sender closed its part of
// the channel, which means EOF. Propagate EOF by allowing
// a read from the exhausted cursor.
}
self.current.read(buf)
}
}
我需要在下载过程中解压缩和解压大的 .tar.gz 文件(例如 ~5Gb),而不在磁盘上保存存档文件。 我使用 reqwest crate 下载文件,flate2 crate 解压,tar crate 解包。我尝试用 tar.gz 格式来做。但是可以使用 zip 和 tar.bz2 格式。 (哪个更容易使用?) 看起来我成功实现了这个,但没想到解包以错误结束:
thread 'main' panicked at 'Cannot unpack archive: Custom { kind: UnexpectedEof, error: TarError { desc: "failed to unpack `/home/ruut/Projects/GreatWar/launcher/gamedata/gamedata-master/.vscode/settings.json`", io: Custom { kind: UnexpectedEof, error: TarError { desc: "failed to unpack `gamedata-master/.vscode/settings.json` into `/home/ruut/Projects/GreatWar/launcher/gamedata/gamedata-master/.vscode/settings.json`", io: Kind(UnexpectedEof) } } } }', /home/ruut/Projects/GreatWar/launcher/src/gitlab.rs:87:38
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
我的代码:
let full_url = format!("{}/{}/{}", HOST, repo_info.url, repo_info.download_url);
let mut response;
match self.client.get(&full_url).send().await {
Ok(res) => response = res,
Err(error) => {
return Err(Error::new(ErrorKind::InvalidData, error));
}
};
if response.status() == reqwest::StatusCode::OK {
let mut stream = response.bytes_stream();
while let Some(item) = stream.next().await {
let chunk = item
.or(Err(format!("Error while downloading file")))
.unwrap();
let b: &[u8] = &chunk.to_vec();
let gz = GzDecoder::new(b);
let mut archive = Archive::new(gz);
archive.unpack("./gamedata").expect("Cannot unpack archive");
}
}
第一次获取块后archive.unpack
抛出错误。
我做错了什么?
Rust 具有 std::io::BufRead
特性。它管理一个内部缓冲区,可以被填充和使用,这使得它非常适合在没有中间 collect
s 的情况下传递数据。
由于 reqwest.Response
实现了 Read
,我们可以将它变成 BufReader
并传递给 flate2::bufread::GzDecoder
。
你的问题是 GzDecoder::new
期望传递给它的所有内容都是完整的存档,但除了第一个块之外的任何内容显然都不是。
最小示例(为简单起见,使用阻塞 API):
use flate2::bufread::GzDecoder;
use std::io::BufReader;
use tar::Archive;
fn main() {
let resp = reqwest::blocking::get(URL).unwrap();
let content_br = BufReader::new(resp);
let tarfile = GzDecoder::new(content_br);
let mut archive = Archive::new(tarfile);
archive.unpack("./gamedata").unwrap();
}
kmdreko 的评论解释了为什么您的代码失败 - .next()
returns 只有第一个块,您必须将所有块提供给 gzip reader。另一个答案显示了如何使用阻塞 reqwest
API.
如果你想继续使用非阻塞API,那么你可以在一个单独的线程中启动解码器,并通过一个通道向它提供数据。例如,您可以使用 flume 个同时支持同步和异步接口的频道。您还需要按照 GzDecoder
的预期将频道转换为 Read
的内容。例如(编译,但未经测试):
use std::io::{self, Read};
use flate2::read::GzDecoder;
use futures_lite::StreamExt;
use tar::Archive;
async fn download() -> io::Result<()> {
let client = reqwest::Client::new();
let full_url = "...";
let response;
match client.get(full_url).send().await {
Ok(res) => response = res,
Err(error) => {
return Err(io::Error::new(io::ErrorKind::InvalidData, error));
}
};
let (tx, rx) = flume::bounded(0);
let decoder_thread = std::thread::spawn(move || {
let input = ChannelRead::new(rx);
let gz = GzDecoder::new(input);
let mut archive = Archive::new(gz);
archive.unpack("./gamedata").unwrap();
});
if response.status() == reqwest::StatusCode::OK {
let mut stream = response.bytes_stream();
while let Some(item) = stream.next().await {
let chunk = item
.or(Err(format!("Error while downloading file")))
.unwrap();
tx.send_async(chunk.to_vec()).await.unwrap();
}
drop(tx); // close the channel to signal EOF
}
tokio::task::spawn_blocking(|| decoder_thread.join())
.await
.unwrap()
.unwrap();
Ok(())
}
// Wrap a channel into something that impls `io::Read`
struct ChannelRead {
rx: flume::Receiver<Vec<u8>>,
current: io::Cursor<Vec<u8>>,
}
impl ChannelRead {
fn new(rx: flume::Receiver<Vec<u8>>) -> ChannelRead {
ChannelRead {
rx,
current: io::Cursor::new(vec![]),
}
}
}
impl Read for ChannelRead {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
if self.current.position() == self.current.get_ref().len() as u64 {
// We've exhausted the previous chunk, get a new one.
if let Ok(vec) = self.rx.recv() {
self.current = io::Cursor::new(vec);
}
// If recv() "fails", it means the sender closed its part of
// the channel, which means EOF. Propagate EOF by allowing
// a read from the exhausted cursor.
}
self.current.read(buf)
}
}