带有 tokio 文件的超级客户端卡在 write()
Hyper client with tokio File stuck at write()
我正在尝试进行全异步下载。
到目前为止下载工作正常。
使用 std::fs::File 它工作正常,但我想尝试使用 tokios 文件使代码完全异步。
如果我只是下载文件并让数据消失,就可以了。但是当我使用 tokio::fs::File 将数据异步写入磁盘时,下载会卡在随机位置。有时为 1.1MB,大部分为 ~1.6MB。总计约 9MB。
我的测试URL是https://github.com/Kitware/CMake/releases/download/v3.20.5/cmake-3.20.5.tar.gz
我得到的最后一个输出是 debug!("Received...") 行。
接近完成的输出是:
DEBUG: Temp File: /tmp/26392_1625868800106141_ZhWUtnaD.tmp
DEBUG: add_pem_file processed 133 valid and 0 invalid certs
DEBUG: No cached session for DNSNameRef("github.com")
DEBUG: Not resuming any session
DEBUG: Using ciphersuite TLS13_CHACHA20_POLY1305_SHA256
DEBUG: Not resuming
DEBUG: TLS1.3 encrypted extensions: [ServerNameAck, Protocols([PayloadU8([104, 50])])]
DEBUG: ALPN protocol is Some(b"h2")
DEBUG: Ticket saved
DEBUG: Ticket saved
DEBUG: Status: 302 Found
[...]
DEBUG: content-length: 621
DEBUG: Sending warning alert CloseNotify
DEBUG: add_pem_file processed 133 valid and 0 invalid certs
DEBUG: No cached session for DNSNameRef("github-releases.githubusercontent.com")
DEBUG: Not resuming any session
DEBUG: Using ciphersuite TLS13_CHACHA20_POLY1305_SHA256
DEBUG: Not resuming
DEBUG: TLS1.3 encrypted extensions: [ServerNameAck, Protocols([PayloadU8([104, 50])])]
DEBUG: ALPN protocol is Some(b"h2")
DEBUG: Ticket saved
DEBUG: Status: 200 OK
[...]
DEBUG: content-length: 9441947
DEBUG: Received 16384 bytes (16384 total)
DEBUG: Written 16384 bytes (16384 total)
DEBUG: Received 9290 bytes (25674 total)
DEBUG: Written 9290 bytes (25674 total)
DEBUG: Received 16384 bytes (42058 total)
DEBUG: Written 16384 bytes (42058 total)
[...]
DEBUG: Received 8460 bytes (1192010 total)
DEBUG: Written 8460 bytes (1192010 total)
DEBUG: Received 8948 bytes (1200958 total)
DEBUG: Written 8948 bytes (1200958 total)
DEBUG: Received 8460 bytes (1209418 total)
DEBUG: Written 8460 bytes (1209418 total)
DEBUG: Received 8948 bytes (1218366 total)
[PROCESS STUCK HERE]
感觉好像有死锁或者什么东西在阻止写入。但我无法找出问题所在。为什么会卡住?
代码:
async fn download_http<P: AsRef<Path>>(url: &Url, localpath: P) -> MyResult<()> {
let mut uri = hyper::Uri::from_str(url.as_str())?;
let mut total_read: usize = 0;
let mut total_written: usize = 0;
let mut localfile = File::create(localpath).await?;
// Redirection Limit
for i in 0..10 {
let https = HttpsConnector::with_native_roots();
let client = Client::builder().build::<_, hyper::Body>(https);
let mut resp = client.get(uri.clone()).await?;
let status = resp.status();
let header = resp.headers();
debug!("Status: {}", status);
for (key, value) in resp.headers() {
debug!("HEADER {}: {}", key, value.to_str().unwrap());
}
if status.is_success() {
// tokio::io::copy(&mut resp.body_mut().data(), &mut localfile).await?;
let expected_size = header.get("content-length").map(|v| v.to_str().unwrap().parse::<usize>().unwrap());
while let Some(next) = resp.data().await {
let mut chunk = next?;
let num_bytes = chunk.len();
total_read = total_read + num_bytes;
debug!("Received {} bytes ({} total)", num_bytes, total_read);
// localfile.write_all(&chunk).await?;
let written = localfile.write(&chunk).await?;
total_written = total_written + written;
debug!("Written {} bytes ({} total)", written, total_written);
if total_read != total_written {
error!("Could not write all data!");
}
if expected_size.is_some() && total_read.eq(&expected_size.unwrap()) {
return Ok(());
}
}
return Ok(());
} else if status.is_redirection() {
let location = header.get("location").unwrap().to_str().unwrap();
uri = hyper::Uri::from_str(location)?;
} else {
let uri_str = uri.to_string();
return Err(MyError::CustomError(CustomError::from_string(format!("HTTP responded with status {}: {}", status, uri_str))))
}
}
Err(MyError::CustomError(CustomError::from_string(format!("HTTP too many redirections"))))
}
板条箱(不完整,仅相关):
futures = "0.3"
futures-cpupool = "0.1"
hyper = { version = "0.14", features = ["full"] }
hyper-rustls = "0.22"
rustls = "0.19"
tokio = { version = "1.6", features = ["full"] }
url = "2.2"
如您所见,下载循环与 Hyper 文档的示例代码相匹配。
我添加了tokio::fs::File写作部分
我添加了调试信息(主要是字节大小)以找出发生了什么以及发生了什么。
评论是理想的方式:使用 write_all 或者如果可能 io::copy.
但我无法在不卡住的情况下让它工作。
能否请您指点我的错误所在?
非常感谢
感谢@HHK在上面的评论。
他建议构建一个最小的、可重现的示例。这样做时,示例运行良好。
所以我反复添加了原始项目中的代码。
我添加的最后一步是我在使项目异步和学习异步时没有删除的遗物。
我在调用异步函数的异步函数中有一个 futures::block_on 调用,导致随机阻塞整个程序。
所以我应该在发布之前制作一段完整的工作代码,这会导致我遇到最初的问题并且让我省去很多麻烦。
为了未来reader:
futures = "0.3"
hyper = { version = "0.14", features = ["full"] }
hyper-rustls = "0.22"
rustls = "0.19"
log = "0.4"
tokio = { version = "1.6", features = ["full"] }
url = "2.2"
代码:
use std::io::{stderr, stdout, Write};
use std::path::{Path, PathBuf};
use std::str::FromStr;
use futures::executor::block_on;
use hyper::body::HttpBody;
use hyper::Client;
use hyper_rustls::HttpsConnector;
use log::{debug, error, LevelFilter, Log, Metadata, Record};
use tokio::fs::File;
use tokio::io::AsyncWriteExt;
use url::Url;
async fn download_http<P: AsRef<Path>>(url: &Url, localpath: P) -> Result<(), ()> {
let mut uri = hyper::Uri::from_str(url.as_str()).unwrap();
let mut total_read: usize = 0;
let mut total_written: usize = 0;
let mut localfile = File::create(localpath).await.unwrap();
// Redirection Limit
for _ in 0..10 {
let https = HttpsConnector::with_native_roots();
let client = Client::builder().build::<_, hyper::Body>(https);
let mut resp = client.get(uri.clone()).await.unwrap();
let status = resp.status();
let header = resp.headers();
debug!("Status: {}", status);
for (key, value) in resp.headers() {
debug!("HEADER {}: {}", key, value.to_str().unwrap());
}
if status.is_success() {
// tokio::io::copy(&mut resp.body_mut().data(), &mut localfile).await.unwrap();
let expected_size = header.get("content-length").map(|v| v.to_str().unwrap().parse::<usize>().unwrap());
while let Some(next) = resp.data().await {
let chunk = next.unwrap();
let num_bytes = chunk.len();
total_read = total_read + num_bytes;
debug!("Received {} bytes ({} total)", num_bytes, total_read);
// localfile.write_all(&chunk).await.unwrap();
let written = localfile.write(&chunk).await.unwrap();
total_written = total_written + written;
debug!("Written {} bytes ({} total)", written, total_written);
if total_read != total_written {
error!("Could not write all data!");
}
if expected_size.is_some() && total_read.eq(&expected_size.unwrap()) {
return Ok(());
}
}
return Ok(());
} else if status.is_redirection() {
let location = header.get("location").unwrap().to_str().unwrap();
uri = hyper::Uri::from_str(location).unwrap();
} else {
return Err(());
}
}
return Err(());
}
struct Logger;
impl Log for Logger {
fn enabled(&self, _: &Metadata) -> bool {
true
}
fn log(&self, record: &Record) {
eprintln!("{}: {}", record.level().as_str().to_uppercase(), record.args());
stdout().flush().unwrap();
stderr().flush().unwrap();
}
fn flush(&self) {
stdout().flush().unwrap();
stderr().flush().unwrap();
}
}
static LOGGER: Logger = Logger;
#[tokio::main]
async fn main() {
log::set_logger(&LOGGER).map(move |()| log::set_max_level(LevelFilter::Debug)).unwrap();
let url = Url::parse("https://github.com/Kitware/CMake/releases/download/v3.20.5/cmake-3.20.5.tar.gz").unwrap();
let localfile = PathBuf::from("/tmp/cmake-3.20.5.tar.gz");
block_on(download_http(&url, &localfile)).unwrap();
// download_http(&url, &localfile).await.unwrap();
}
在 block_on 和不使用它之间切换会有所不同。
现在我可以切换回使用 write_all 并删除我的调试代码。
我正在尝试进行全异步下载。
到目前为止下载工作正常。
使用 std::fs::File 它工作正常,但我想尝试使用 tokios 文件使代码完全异步。
如果我只是下载文件并让数据消失,就可以了。但是当我使用 tokio::fs::File 将数据异步写入磁盘时,下载会卡在随机位置。有时为 1.1MB,大部分为 ~1.6MB。总计约 9MB。
我的测试URL是https://github.com/Kitware/CMake/releases/download/v3.20.5/cmake-3.20.5.tar.gz
我得到的最后一个输出是 debug!("Received...") 行。
接近完成的输出是:
DEBUG: Temp File: /tmp/26392_1625868800106141_ZhWUtnaD.tmp
DEBUG: add_pem_file processed 133 valid and 0 invalid certs
DEBUG: No cached session for DNSNameRef("github.com")
DEBUG: Not resuming any session
DEBUG: Using ciphersuite TLS13_CHACHA20_POLY1305_SHA256
DEBUG: Not resuming
DEBUG: TLS1.3 encrypted extensions: [ServerNameAck, Protocols([PayloadU8([104, 50])])]
DEBUG: ALPN protocol is Some(b"h2")
DEBUG: Ticket saved
DEBUG: Ticket saved
DEBUG: Status: 302 Found
[...]
DEBUG: content-length: 621
DEBUG: Sending warning alert CloseNotify
DEBUG: add_pem_file processed 133 valid and 0 invalid certs
DEBUG: No cached session for DNSNameRef("github-releases.githubusercontent.com")
DEBUG: Not resuming any session
DEBUG: Using ciphersuite TLS13_CHACHA20_POLY1305_SHA256
DEBUG: Not resuming
DEBUG: TLS1.3 encrypted extensions: [ServerNameAck, Protocols([PayloadU8([104, 50])])]
DEBUG: ALPN protocol is Some(b"h2")
DEBUG: Ticket saved
DEBUG: Status: 200 OK
[...]
DEBUG: content-length: 9441947
DEBUG: Received 16384 bytes (16384 total)
DEBUG: Written 16384 bytes (16384 total)
DEBUG: Received 9290 bytes (25674 total)
DEBUG: Written 9290 bytes (25674 total)
DEBUG: Received 16384 bytes (42058 total)
DEBUG: Written 16384 bytes (42058 total)
[...]
DEBUG: Received 8460 bytes (1192010 total)
DEBUG: Written 8460 bytes (1192010 total)
DEBUG: Received 8948 bytes (1200958 total)
DEBUG: Written 8948 bytes (1200958 total)
DEBUG: Received 8460 bytes (1209418 total)
DEBUG: Written 8460 bytes (1209418 total)
DEBUG: Received 8948 bytes (1218366 total)
[PROCESS STUCK HERE]
感觉好像有死锁或者什么东西在阻止写入。但我无法找出问题所在。为什么会卡住?
代码:
async fn download_http<P: AsRef<Path>>(url: &Url, localpath: P) -> MyResult<()> {
let mut uri = hyper::Uri::from_str(url.as_str())?;
let mut total_read: usize = 0;
let mut total_written: usize = 0;
let mut localfile = File::create(localpath).await?;
// Redirection Limit
for i in 0..10 {
let https = HttpsConnector::with_native_roots();
let client = Client::builder().build::<_, hyper::Body>(https);
let mut resp = client.get(uri.clone()).await?;
let status = resp.status();
let header = resp.headers();
debug!("Status: {}", status);
for (key, value) in resp.headers() {
debug!("HEADER {}: {}", key, value.to_str().unwrap());
}
if status.is_success() {
// tokio::io::copy(&mut resp.body_mut().data(), &mut localfile).await?;
let expected_size = header.get("content-length").map(|v| v.to_str().unwrap().parse::<usize>().unwrap());
while let Some(next) = resp.data().await {
let mut chunk = next?;
let num_bytes = chunk.len();
total_read = total_read + num_bytes;
debug!("Received {} bytes ({} total)", num_bytes, total_read);
// localfile.write_all(&chunk).await?;
let written = localfile.write(&chunk).await?;
total_written = total_written + written;
debug!("Written {} bytes ({} total)", written, total_written);
if total_read != total_written {
error!("Could not write all data!");
}
if expected_size.is_some() && total_read.eq(&expected_size.unwrap()) {
return Ok(());
}
}
return Ok(());
} else if status.is_redirection() {
let location = header.get("location").unwrap().to_str().unwrap();
uri = hyper::Uri::from_str(location)?;
} else {
let uri_str = uri.to_string();
return Err(MyError::CustomError(CustomError::from_string(format!("HTTP responded with status {}: {}", status, uri_str))))
}
}
Err(MyError::CustomError(CustomError::from_string(format!("HTTP too many redirections"))))
}
板条箱(不完整,仅相关):
futures = "0.3"
futures-cpupool = "0.1"
hyper = { version = "0.14", features = ["full"] }
hyper-rustls = "0.22"
rustls = "0.19"
tokio = { version = "1.6", features = ["full"] }
url = "2.2"
如您所见,下载循环与 Hyper 文档的示例代码相匹配。
我添加了tokio::fs::File写作部分
我添加了调试信息(主要是字节大小)以找出发生了什么以及发生了什么。
评论是理想的方式:使用 write_all 或者如果可能 io::copy.
但我无法在不卡住的情况下让它工作。
能否请您指点我的错误所在?
非常感谢
感谢@HHK在上面的评论。
他建议构建一个最小的、可重现的示例。这样做时,示例运行良好。
所以我反复添加了原始项目中的代码。
我添加的最后一步是我在使项目异步和学习异步时没有删除的遗物。
我在调用异步函数的异步函数中有一个 futures::block_on 调用,导致随机阻塞整个程序。
所以我应该在发布之前制作一段完整的工作代码,这会导致我遇到最初的问题并且让我省去很多麻烦。
为了未来reader:
futures = "0.3"
hyper = { version = "0.14", features = ["full"] }
hyper-rustls = "0.22"
rustls = "0.19"
log = "0.4"
tokio = { version = "1.6", features = ["full"] }
url = "2.2"
代码:
use std::io::{stderr, stdout, Write};
use std::path::{Path, PathBuf};
use std::str::FromStr;
use futures::executor::block_on;
use hyper::body::HttpBody;
use hyper::Client;
use hyper_rustls::HttpsConnector;
use log::{debug, error, LevelFilter, Log, Metadata, Record};
use tokio::fs::File;
use tokio::io::AsyncWriteExt;
use url::Url;
async fn download_http<P: AsRef<Path>>(url: &Url, localpath: P) -> Result<(), ()> {
let mut uri = hyper::Uri::from_str(url.as_str()).unwrap();
let mut total_read: usize = 0;
let mut total_written: usize = 0;
let mut localfile = File::create(localpath).await.unwrap();
// Redirection Limit
for _ in 0..10 {
let https = HttpsConnector::with_native_roots();
let client = Client::builder().build::<_, hyper::Body>(https);
let mut resp = client.get(uri.clone()).await.unwrap();
let status = resp.status();
let header = resp.headers();
debug!("Status: {}", status);
for (key, value) in resp.headers() {
debug!("HEADER {}: {}", key, value.to_str().unwrap());
}
if status.is_success() {
// tokio::io::copy(&mut resp.body_mut().data(), &mut localfile).await.unwrap();
let expected_size = header.get("content-length").map(|v| v.to_str().unwrap().parse::<usize>().unwrap());
while let Some(next) = resp.data().await {
let chunk = next.unwrap();
let num_bytes = chunk.len();
total_read = total_read + num_bytes;
debug!("Received {} bytes ({} total)", num_bytes, total_read);
// localfile.write_all(&chunk).await.unwrap();
let written = localfile.write(&chunk).await.unwrap();
total_written = total_written + written;
debug!("Written {} bytes ({} total)", written, total_written);
if total_read != total_written {
error!("Could not write all data!");
}
if expected_size.is_some() && total_read.eq(&expected_size.unwrap()) {
return Ok(());
}
}
return Ok(());
} else if status.is_redirection() {
let location = header.get("location").unwrap().to_str().unwrap();
uri = hyper::Uri::from_str(location).unwrap();
} else {
return Err(());
}
}
return Err(());
}
struct Logger;
impl Log for Logger {
fn enabled(&self, _: &Metadata) -> bool {
true
}
fn log(&self, record: &Record) {
eprintln!("{}: {}", record.level().as_str().to_uppercase(), record.args());
stdout().flush().unwrap();
stderr().flush().unwrap();
}
fn flush(&self) {
stdout().flush().unwrap();
stderr().flush().unwrap();
}
}
static LOGGER: Logger = Logger;
#[tokio::main]
async fn main() {
log::set_logger(&LOGGER).map(move |()| log::set_max_level(LevelFilter::Debug)).unwrap();
let url = Url::parse("https://github.com/Kitware/CMake/releases/download/v3.20.5/cmake-3.20.5.tar.gz").unwrap();
let localfile = PathBuf::from("/tmp/cmake-3.20.5.tar.gz");
block_on(download_http(&url, &localfile)).unwrap();
// download_http(&url, &localfile).await.unwrap();
}
在 block_on 和不使用它之间切换会有所不同。
现在我可以切换回使用 write_all 并删除我的调试代码。