如何即时解压缩 hyper::Response 中的 XZ 数据?

How to decompress XZ data from a hyper::Response on the fly?

我正在使用 hyper, and I would like to save it to disk in decompressed form by extracting as much as possible from each incoming Chunk 下载 XZ 文件并立即将结果写入磁盘,而不是先下载整个文件然后解压缩。

存在 xz2 crate that implements the XZ format. However, its XzDecoder does not seem to support a Python-like decompressobj 模型,其中调用者重复提供部分输入并获得部分输出。

相反,XzDecoder 通过 Read parameter, and I'm not sure how to glue these two things together. Is there a way to feed a ResponseXzDecoder?

接收输入字节

到目前为止我发现的唯一线索是这种 issue, which contains a reference to a private ReadableChunks 类型,理论上我可以在我的代码中复制它 - 但也许有更简单的方法?

XzDecoder does not seem to support a Python-like decompressobj model, where a caller repeatedly feeds partial input and gets partial output

xz2::stream::Stream 可以满足您的需求。非常粗糙的未经测试的代码,需要适当的错误处理等,但我希望你能明白:

fn process(body: hyper::body::Body) {
    let mut decoder = xz2::stream::Stream::new_stream_decoder(1000, 0).unwrap();
    body.for_each(|chunk| {
        let mut buf: Vec<u8> = Vec::new();
        if let Ok(_) = decoder.process_vec(&chunk, &mut buf, Action::Run) {
            // write buf to disk
        }
        Ok(())
    }).wait().unwrap();
}

基于,我想出了以下工作代码:

extern crate failure;
extern crate hyper;
extern crate tokio;
extern crate xz2;

use std::fs::File;
use std::io::Write;
use std::u64;

use failure::Error;
use futures::future::done;
use futures::stream::Stream;
use hyper::{Body, Chunk, Response};
use hyper::rt::Future;
use hyper_tls::HttpsConnector;
use tokio::runtime::Runtime;

fn decode_chunk(file: &mut File, xz: &mut xz2::stream::Stream, chunk: &Chunk)
                -> Result<(), Error> {
    let end = xz.total_in() as usize + chunk.len();
    let mut buf = Vec::with_capacity(8192);
    while (xz.total_in() as usize) < end {
        buf.clear();
        xz.process_vec(
            &chunk[chunk.len() - (end - xz.total_in() as usize)..],
            &mut buf,
            xz2::stream::Action::Run)?;
        file.write_all(&buf)?;
    }
    Ok(())
}

fn decode_response(mut file: File, response: Response<Body>)
                   -> impl Future<Item=(), Error=Error> {
    done(xz2::stream::Stream::new_stream_decoder(u64::MAX, 0)
        .map_err(Error::from))
        .and_then(|mut xz| response
            .into_body()
            .map_err(Error::from)
            .for_each(move |chunk| done(
                decode_chunk(&mut file, &mut xz, &chunk))))
}

fn main() -> Result<(), Error> {
    let client = hyper::Client::builder().build::<_, hyper::Body>(
        HttpsConnector::new(1)?);
    let file = File::create("hello-2.7.tar")?;
    let mut runtime = Runtime::new()?;
    runtime.block_on(client
        .get("https://ftp.gnu.org/gnu/hello/hello-2.7.tar.xz".parse()?)
        .map_err(Error::from)
        .and_then(|response| decode_response(file, response)))?;
    runtime.shutdown_now();
    Ok(())
}