如何使用 hyper 下载大文件并在出错时恢复?
How to download a large file with hyper and resume on error?
我想用hyper下载大文件(500mb),下载失败也能继续下载。
hyper 有什么方法可以 运行 为接收到的每个数据块提供一些功能吗? send()
方法 return 是 Result<Response>
,但我在 Response 上找不到任何方法,return 是块上的迭代器。理想情况下,我可以做类似的事情:
client.get(&url.to_string())
.send()
.map(|mut res| {
let mut chunk = String::new();
// write this chunk to disk
});
这可能吗,或者只有在 hyper 下载整个文件后才会调用 map
?
Is there any way with hyper to run some function for each chunk of data received?
Hyper 的 Response
implements Read
。这意味着 Response
是一个流,您可以像通常使用流一样从中读取任意数据块。
不管它的价值如何,这里有一段代码我用来从 ICECat 下载大文件。我正在使用 Read
界面,以便在终端中显示下载进度。
这里的变量response
是Hyper的Response
的一个实例。
{
let mut file = try_s!(fs::File::create(&tmp_path));
let mut deflate = try_s!(GzDecoder::new(response));
let mut buf = [0; 128 * 1024];
let mut written = 0;
loop {
status_line! ("icecat_fetch] " (url) ": " (written / 1024 / 1024) " MiB.");
let len = match deflate.read(&mut buf) {
Ok(0) => break, // EOF.
Ok(len) => len,
Err(ref err) if err.kind() == io::ErrorKind::Interrupted => continue,
Err(err) => return ERR!("{}: Download failed: {}", url, err),
};
try_s!(file.write_all(&buf[..len]));
written += len;
}
}
try_s!(fs::rename(tmp_path, target_path));
status_line_clear();
I want to download large files (500mb) with hyper, and be able to resume if the download fails.
这通常使用 HTTP“范围”header(参见 RFC 7233)实现。
并非所有服务器都支持“Range”header。我见过很多带有自定义 HTTP 堆栈但没有适当的“范围”支持,或者由于某种原因禁用了“范围”header 的服务器。因此,跳过 Hyper 的 Response
块可能是必要的回退。
但是如果你想加快速度并节省流量,那么恢复停止下载的主要方法应该是使用“范围”header。
P.S。对于 Hyper 0.12,Hyper 返回的响应 body 是一个 Stream
并且对于接收到的每个数据块的 运行 一些函数,我们可以使用 for_each
流组合器:
extern crate futures;
extern crate futures_cpupool;
extern crate hyper; // 0.12
extern crate hyper_rustls;
use futures::Future;
use futures_cpupool::CpuPool;
use hyper::rt::Stream;
use hyper::{Body, Client, Request};
use hyper_rustls::HttpsConnector;
use std::thread;
use std::time::Duration;
fn main() {
let url = "https://steemitimages.com/DQmYWcEumaw1ajSge5PcGpgPpXydTkTcqe1daF4Ro3sRLDi/IMG_20130103_103123.jpg";
// In real life we'd want an asynchronous reactor, such as the tokio_core, but for a short example the `CpuPool` should do.
let pool = CpuPool::new(1);
let https = HttpsConnector::new(1);
let client = Client::builder().executor(pool.clone()).build(https);
// `unwrap` is used because there are different ways (and/or libraries) to handle the errors and you should pick one yourself.
// Also to keep this example simple.
let req = Request::builder().uri(url).body(Body::empty()).unwrap();
let fut = client.request(req);
// Rebinding (shadowing) the `fut` variable allows us (in smart IDEs) to more easily examine the gradual weaving of the types.
let fut = fut.then(move |res| {
let res = res.unwrap();
println!("Status: {:?}.", res.status());
let body = res.into_body();
// `for_each` returns a `Future` that we must embed into our chain of futures in order to execute it.
body.for_each(move |chunk| {println!("Got a chunk of {} bytes.", chunk.len()); Ok(())})
});
// Handle the errors: we need error-free futures for `spawn`.
let fut = fut.then(move |r| -> Result<(), ()> {r.unwrap(); Ok(())});
// Spawning the future onto a runtime starts executing it in background.
// If not spawned onto a runtime the future will be executed in `wait`.
//
// Note that we should keep the future around.
// To save resources most implementations would *cancel* the dropped futures.
let _fut = pool.spawn(fut);
thread::sleep (Duration::from_secs (1)); // or `_fut.wait()`.
}
我想用hyper下载大文件(500mb),下载失败也能继续下载。
hyper 有什么方法可以 运行 为接收到的每个数据块提供一些功能吗? send()
方法 return 是 Result<Response>
,但我在 Response 上找不到任何方法,return 是块上的迭代器。理想情况下,我可以做类似的事情:
client.get(&url.to_string())
.send()
.map(|mut res| {
let mut chunk = String::new();
// write this chunk to disk
});
这可能吗,或者只有在 hyper 下载整个文件后才会调用 map
?
Is there any way with hyper to run some function for each chunk of data received?
Hyper 的 Response
implements Read
。这意味着 Response
是一个流,您可以像通常使用流一样从中读取任意数据块。
不管它的价值如何,这里有一段代码我用来从 ICECat 下载大文件。我正在使用 Read
界面,以便在终端中显示下载进度。
这里的变量response
是Hyper的Response
的一个实例。
{
let mut file = try_s!(fs::File::create(&tmp_path));
let mut deflate = try_s!(GzDecoder::new(response));
let mut buf = [0; 128 * 1024];
let mut written = 0;
loop {
status_line! ("icecat_fetch] " (url) ": " (written / 1024 / 1024) " MiB.");
let len = match deflate.read(&mut buf) {
Ok(0) => break, // EOF.
Ok(len) => len,
Err(ref err) if err.kind() == io::ErrorKind::Interrupted => continue,
Err(err) => return ERR!("{}: Download failed: {}", url, err),
};
try_s!(file.write_all(&buf[..len]));
written += len;
}
}
try_s!(fs::rename(tmp_path, target_path));
status_line_clear();
I want to download large files (500mb) with hyper, and be able to resume if the download fails.
这通常使用 HTTP“范围”header(参见 RFC 7233)实现。
并非所有服务器都支持“Range”header。我见过很多带有自定义 HTTP 堆栈但没有适当的“范围”支持,或者由于某种原因禁用了“范围”header 的服务器。因此,跳过 Hyper 的 Response
块可能是必要的回退。
但是如果你想加快速度并节省流量,那么恢复停止下载的主要方法应该是使用“范围”header。
P.S。对于 Hyper 0.12,Hyper 返回的响应 body 是一个 Stream
并且对于接收到的每个数据块的 运行 一些函数,我们可以使用 for_each
流组合器:
extern crate futures;
extern crate futures_cpupool;
extern crate hyper; // 0.12
extern crate hyper_rustls;
use futures::Future;
use futures_cpupool::CpuPool;
use hyper::rt::Stream;
use hyper::{Body, Client, Request};
use hyper_rustls::HttpsConnector;
use std::thread;
use std::time::Duration;
fn main() {
let url = "https://steemitimages.com/DQmYWcEumaw1ajSge5PcGpgPpXydTkTcqe1daF4Ro3sRLDi/IMG_20130103_103123.jpg";
// In real life we'd want an asynchronous reactor, such as the tokio_core, but for a short example the `CpuPool` should do.
let pool = CpuPool::new(1);
let https = HttpsConnector::new(1);
let client = Client::builder().executor(pool.clone()).build(https);
// `unwrap` is used because there are different ways (and/or libraries) to handle the errors and you should pick one yourself.
// Also to keep this example simple.
let req = Request::builder().uri(url).body(Body::empty()).unwrap();
let fut = client.request(req);
// Rebinding (shadowing) the `fut` variable allows us (in smart IDEs) to more easily examine the gradual weaving of the types.
let fut = fut.then(move |res| {
let res = res.unwrap();
println!("Status: {:?}.", res.status());
let body = res.into_body();
// `for_each` returns a `Future` that we must embed into our chain of futures in order to execute it.
body.for_each(move |chunk| {println!("Got a chunk of {} bytes.", chunk.len()); Ok(())})
});
// Handle the errors: we need error-free futures for `spawn`.
let fut = fut.then(move |r| -> Result<(), ()> {r.unwrap(); Ok(())});
// Spawning the future onto a runtime starts executing it in background.
// If not spawned onto a runtime the future will be executed in `wait`.
//
// Note that we should keep the future around.
// To save resources most implementations would *cancel* the dropped futures.
let _fut = pool.spawn(fut);
thread::sleep (Duration::from_secs (1)); // or `_fut.wait()`.
}