带有 tokio 文件的超级客户端卡在 write()

Hyper client with tokio File stuck at write()

我正在尝试进行全异步下载。

到目前为止下载工作正常。

使用 std::fs::File 它工作正常,但我想尝试使用 tokios 文件使代码完全异步。

如果我只是下载文件并让数据消失,就可以了。但是当我使用 tokio::fs::File 将数据异步写入磁盘时,下载会卡在随机位置。有时为 1.1MB,大部分为 ~1.6MB。总计约 9MB。

我的测试URL是https://github.com/Kitware/CMake/releases/download/v3.20.5/cmake-3.20.5.tar.gz

我得到的最后一个输出是 debug!("Received...") 行。

接近完成的输出是:

DEBUG: Temp File: /tmp/26392_1625868800106141_ZhWUtnaD.tmp

DEBUG: add_pem_file processed 133 valid and 0 invalid certs
DEBUG: No cached session for DNSNameRef("github.com")
DEBUG: Not resuming any session
DEBUG: Using ciphersuite TLS13_CHACHA20_POLY1305_SHA256
DEBUG: Not resuming
DEBUG: TLS1.3 encrypted extensions: [ServerNameAck, Protocols([PayloadU8([104, 50])])]
DEBUG: ALPN protocol is Some(b"h2")
DEBUG: Ticket saved
DEBUG: Ticket saved
DEBUG: Status: 302 Found
[...]
DEBUG: content-length: 621
DEBUG: Sending warning alert CloseNotify

DEBUG: add_pem_file processed 133 valid and 0 invalid certs
DEBUG: No cached session for DNSNameRef("github-releases.githubusercontent.com")
DEBUG: Not resuming any session
DEBUG: Using ciphersuite TLS13_CHACHA20_POLY1305_SHA256
DEBUG: Not resuming
DEBUG: TLS1.3 encrypted extensions: [ServerNameAck, Protocols([PayloadU8([104, 50])])]
DEBUG: ALPN protocol is Some(b"h2")
DEBUG: Ticket saved
DEBUG: Status: 200 OK
[...]
DEBUG: content-length: 9441947

DEBUG: Received 16384 bytes (16384 total)
DEBUG: Written 16384 bytes (16384 total)
DEBUG: Received 9290 bytes (25674 total)
DEBUG: Written 9290 bytes (25674 total)
DEBUG: Received 16384 bytes (42058 total)
DEBUG: Written 16384 bytes (42058 total)
[...]
DEBUG: Received 8460 bytes (1192010 total)
DEBUG: Written 8460 bytes (1192010 total)
DEBUG: Received 8948 bytes (1200958 total)
DEBUG: Written 8948 bytes (1200958 total)
DEBUG: Received 8460 bytes (1209418 total)
DEBUG: Written 8460 bytes (1209418 total)
DEBUG: Received 8948 bytes (1218366 total)
[PROCESS STUCK HERE]

感觉好像有死锁或者什么东西在阻止写入。但我无法找出问题所在。为什么会卡住?

代码:

async fn download_http<P: AsRef<Path>>(url: &Url, localpath: P) -> MyResult<()> {
    let mut uri = hyper::Uri::from_str(url.as_str())?;

    let mut total_read: usize = 0;
    let mut total_written: usize = 0;
    let mut localfile = File::create(localpath).await?;

    // Redirection Limit
    for i in 0..10 {
        let https = HttpsConnector::with_native_roots();
        let client = Client::builder().build::<_, hyper::Body>(https);
        let mut resp = client.get(uri.clone()).await?;

        let status = resp.status();
        let header = resp.headers();

        debug!("Status: {}", status);
        for (key, value) in resp.headers() {
            debug!("HEADER {}: {}", key, value.to_str().unwrap());
        }

        if status.is_success() {
            // tokio::io::copy(&mut resp.body_mut().data(), &mut localfile).await?;

            let expected_size = header.get("content-length").map(|v| v.to_str().unwrap().parse::<usize>().unwrap());
            
            while let Some(next) = resp.data().await {
                let mut chunk = next?;
            
                let num_bytes = chunk.len();
                total_read = total_read + num_bytes;
                debug!("Received {} bytes ({} total)", num_bytes, total_read);
            
                // localfile.write_all(&chunk).await?;
                let written = localfile.write(&chunk).await?;
                total_written = total_written + written;
                debug!("Written {} bytes ({} total)", written, total_written);
            
                if total_read != total_written {
                    error!("Could not write all data!");
                }
            
                if expected_size.is_some() && total_read.eq(&expected_size.unwrap()) {
                    return Ok(());
                }
            }

            return Ok(());
        } else if status.is_redirection() {
            let location = header.get("location").unwrap().to_str().unwrap();

            uri = hyper::Uri::from_str(location)?;
        } else {
            let uri_str = uri.to_string();

            return Err(MyError::CustomError(CustomError::from_string(format!("HTTP responded with status {}: {}", status, uri_str))))
        }
    }

    Err(MyError::CustomError(CustomError::from_string(format!("HTTP too many redirections"))))
}

板条箱(不完整,仅相关):

futures = "0.3"
futures-cpupool = "0.1"
hyper = { version = "0.14", features = ["full"] }
hyper-rustls = "0.22"
rustls = "0.19"
tokio = { version = "1.6", features = ["full"] }
url = "2.2"

如您所见,下载循环与 Hyper 文档的示例代码相匹配。

我添加了tokio::fs::File写作部分

我添加了调试信息(主要是字节大小)以找出发生了什么以及发生了什么。

评论是理想的方式:使用 write_all 或者如果可能 io::copy.

但我无法在不卡住的情况下让它工作。

能否请您指点我的错误所在?

非常感谢

感谢@HHK在上面的评论。

他建议构建一个最小的、可重现的示例。这样做时,示例运行良好。

所以我反复添加了原始项目中的代码。

我添加的最后一步是我在使项目异步和学习异步时没有删除的遗物。

我在调用异步函数的异步函数中有一个 futures::block_on 调用,导致随机阻塞整个程序。

所以我应该在发布之前制作一段完整的工作代码,这会导致我遇到最初的问题并且让我省去很多麻烦。

为了未来reader:

futures = "0.3"
hyper = { version = "0.14", features = ["full"] }
hyper-rustls = "0.22"
rustls = "0.19"
log = "0.4"
tokio = { version = "1.6", features = ["full"] }
url = "2.2"

代码:

use std::io::{stderr, stdout, Write};
use std::path::{Path, PathBuf};
use std::str::FromStr;

use futures::executor::block_on;
use hyper::body::HttpBody;
use hyper::Client;
use hyper_rustls::HttpsConnector;
use log::{debug, error, LevelFilter, Log, Metadata, Record};
use tokio::fs::File;
use tokio::io::AsyncWriteExt;
use url::Url;

async fn download_http<P: AsRef<Path>>(url: &Url, localpath: P) -> Result<(), ()> {
    let mut uri = hyper::Uri::from_str(url.as_str()).unwrap();

    let mut total_read: usize = 0;
    let mut total_written: usize = 0;
    let mut localfile = File::create(localpath).await.unwrap();

    // Redirection Limit
    for _ in 0..10 {
        let https = HttpsConnector::with_native_roots();
        let client = Client::builder().build::<_, hyper::Body>(https);
        let mut resp = client.get(uri.clone()).await.unwrap();

        let status = resp.status();
        let header = resp.headers();

        debug!("Status: {}", status);
        for (key, value) in resp.headers() {
            debug!("HEADER {}: {}", key, value.to_str().unwrap());
        }

        if status.is_success() {
            // tokio::io::copy(&mut resp.body_mut().data(), &mut localfile).await.unwrap();

            let expected_size = header.get("content-length").map(|v| v.to_str().unwrap().parse::<usize>().unwrap());

            while let Some(next) = resp.data().await {
                let chunk = next.unwrap();

                let num_bytes = chunk.len();
                total_read = total_read + num_bytes;
                debug!("Received {} bytes ({} total)", num_bytes, total_read);

                // localfile.write_all(&chunk).await.unwrap();
                let written = localfile.write(&chunk).await.unwrap();
                total_written = total_written + written;
                debug!("Written {} bytes ({} total)", written, total_written);

                if total_read != total_written {
                    error!("Could not write all data!");
                }

                if expected_size.is_some() && total_read.eq(&expected_size.unwrap()) {
                    return Ok(());
                }
            }

            return Ok(());
        } else if status.is_redirection() {
            let location = header.get("location").unwrap().to_str().unwrap();

            uri = hyper::Uri::from_str(location).unwrap();
        } else {
            return Err(());
        }
    }

    return Err(());
}


struct Logger;

impl Log for Logger {
    fn enabled(&self, _: &Metadata) -> bool {
        true
    }

    fn log(&self, record: &Record) {
        eprintln!("{}: {}", record.level().as_str().to_uppercase(), record.args());
        stdout().flush().unwrap();
        stderr().flush().unwrap();
    }

    fn flush(&self) {
        stdout().flush().unwrap();
        stderr().flush().unwrap();
    }
}

static LOGGER: Logger = Logger;

#[tokio::main]
async fn main() {
    log::set_logger(&LOGGER).map(move |()| log::set_max_level(LevelFilter::Debug)).unwrap();

    let url = Url::parse("https://github.com/Kitware/CMake/releases/download/v3.20.5/cmake-3.20.5.tar.gz").unwrap();
    let localfile = PathBuf::from("/tmp/cmake-3.20.5.tar.gz");

    block_on(download_http(&url, &localfile)).unwrap();
    // download_http(&url, &localfile).await.unwrap();
}

在 block_on 和不使用它之间切换会有所不同。

现在我可以切换回使用 write_all 并删除我的调试代码。