如何在下载过程中解压缩和解压缩 tar.gz 存档?

How decompress and unpack tar.gz archive in download process?

我需要在下载过程中解压缩和解压大的 .tar.gz 文件(例如 ~5Gb),而不在磁盘上保存存档文件。 我使用 reqwest crate 下载文件,flate2 crate 解压,tar crate 解包。我尝试用 tar.gz 格式来做。但是可以使用 zip 和 tar.bz2 格式。 (哪个更容易使用?) 看起来我成功实现了这个,但没想到解包以错误结束:

thread 'main' panicked at 'Cannot unpack archive: Custom { kind: UnexpectedEof, error: TarError { desc: "failed to unpack `/home/ruut/Projects/GreatWar/launcher/gamedata/gamedata-master/.vscode/settings.json`", io: Custom { kind: UnexpectedEof, error: TarError { desc: "failed to unpack `gamedata-master/.vscode/settings.json` into `/home/ruut/Projects/GreatWar/launcher/gamedata/gamedata-master/.vscode/settings.json`", io: Kind(UnexpectedEof) } } } }', /home/ruut/Projects/GreatWar/launcher/src/gitlab.rs:87:38
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace

我的代码:

let full_url = format!("{}/{}/{}", HOST, repo_info.url, repo_info.download_url);
let mut response;

match self.client.get(&full_url).send().await {
  Ok(res) => response = res,
  Err(error) => {
    return Err(Error::new(ErrorKind::InvalidData, error));
  }
};

if response.status() == reqwest::StatusCode::OK {
  let mut stream = response.bytes_stream();

  while let Some(item) = stream.next().await {
    let chunk = item
      .or(Err(format!("Error while downloading file")))
      .unwrap();

    let b: &[u8] = &chunk.to_vec();
    let gz = GzDecoder::new(b);
    let mut archive = Archive::new(gz);

    archive.unpack("./gamedata").expect("Cannot unpack archive");
  }
}

第一次获取块后archive.unpack抛出错误。
我做错了什么?

Rust 具有 std::io::BufRead 特性。它管理一个内部缓冲区,可以被填充和使用,这使得它非常适合在没有中间 collects 的情况下传递数据。

由于 reqwest.Response 实现了 Read,我们可以将它变成 BufReader 并传递给 flate2::bufread::GzDecoder

你的问题是 GzDecoder::new 期望传递给它的所有内容都是完整的存档,但除了第一个块之外的任何内容显然都不是。

最小示例(为简单起见,使用阻塞 API):

use flate2::bufread::GzDecoder;
use std::io::BufReader;
use tar::Archive;

fn main() {
    let resp = reqwest::blocking::get(URL).unwrap();
    let content_br = BufReader::new(resp);
    let tarfile = GzDecoder::new(content_br);
    let mut archive = Archive::new(tarfile);
    archive.unpack("./gamedata").unwrap();
}

kmdreko 的评论解释了为什么您的代码失败 - .next() returns 只有第一个块,您必须将所有块提供给 gzip reader。另一个答案显示了如何使用阻塞 reqwest API.

如果你想继续使用非阻塞API,那么你可以在一个单独的线程中启动解码器,并通过一个通道向它提供数据。例如,您可以使用 flume 个同时支持同步和异步接口的频道。您还需要按照 GzDecoder 的预期将频道转换为 Read 的内容。例如(编译,但未经测试):

use std::io::{self, Read};

use flate2::read::GzDecoder;
use futures_lite::StreamExt;
use tar::Archive;

async fn download() -> io::Result<()> {
    let client = reqwest::Client::new();

    let full_url = "...";
    let response;

    match client.get(full_url).send().await {
        Ok(res) => response = res,
        Err(error) => {
            return Err(io::Error::new(io::ErrorKind::InvalidData, error));
        }
    };

    let (tx, rx) = flume::bounded(0);

    let decoder_thread = std::thread::spawn(move || {
        let input = ChannelRead::new(rx);
        let gz = GzDecoder::new(input);
        let mut archive = Archive::new(gz);
        archive.unpack("./gamedata").unwrap();
    });

    if response.status() == reqwest::StatusCode::OK {
        let mut stream = response.bytes_stream();

        while let Some(item) = stream.next().await {
            let chunk = item
                .or(Err(format!("Error while downloading file")))
                .unwrap();
            tx.send_async(chunk.to_vec()).await.unwrap();
        }
        drop(tx); // close the channel to signal EOF
    }

    tokio::task::spawn_blocking(|| decoder_thread.join())
        .await
        .unwrap()
        .unwrap();

    Ok(())
}

// Wrap a channel into something that impls `io::Read`
struct ChannelRead {
    rx: flume::Receiver<Vec<u8>>,
    current: io::Cursor<Vec<u8>>,
}

impl ChannelRead {
    fn new(rx: flume::Receiver<Vec<u8>>) -> ChannelRead {
        ChannelRead {
            rx,
            current: io::Cursor::new(vec![]),
        }
    }
}

impl Read for ChannelRead {
    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
        if self.current.position() == self.current.get_ref().len() as u64 {
            // We've exhausted the previous chunk, get a new one.
            if let Ok(vec) = self.rx.recv() {
                self.current = io::Cursor::new(vec);
            }
            // If recv() "fails", it means the sender closed its part of
            // the channel, which means EOF. Propagate EOF by allowing
            // a read from the exhausted cursor.
        }
        self.current.read(buf)
    }
}