使用 hyper crate Body 作为 Future crate Stream 参数

Use hyper crate Body as Future crate Stream parameter

使用 hyper crate, I'm making an HTTP request to an endpoint then subsequently attempting to pass the response Body to a third-party library that expects a parameter to be a Futures crate Stream.

这会导致类型错误。

Cargo.toml

[dependencies]
bytes = "1.0.1"
http = "0.2.3"
tokio = { version = "1.1.0", features = ["full"] }
hyper = { version = "0.14.2", features = ["full"] }
hyper-tls = "0.5.0"
futures = "0.3.12"

例子

use std::io;
use bytes::Bytes;
use hyper::{Client, Body};
use hyper_tls::HttpsConnector;
use http::Request;
use futures::stream::Stream;

// ---- begin third-party library
type ConsumableStream = dyn Stream<Item = Result<Bytes, io::Error>> + Send + Sync + 'static;
async fn stream_consumer(_: &mut ConsumableStream) {
    // consume stream...
}
// ---- end third-party library

#[tokio::main]
async fn main() {
    let uri = "https://jsonplaceholder.typicode.com/todos/1";
    let https = HttpsConnector::new();
    let client = Client::builder().build::<_, Body>(https);
    let request = Request::get(uri).body(Body::empty()).unwrap();
    let response = client.request(request).await.unwrap();
    let mut body = Box::new(response.into_body());
    stream_consumer(&mut body).await;
}

货物运行输出

error[E0271]: type mismatch resolving `<std::boxed::Box<hyper::body::body::Body> as futures_core::stream::Stream>::Item == std::result::Result<bytes::bytes::Bytes, std::io::Error>`
  --> src/bin/future_streams.rs:24:21
   |
24 |     stream_consumer(&mut body).await;
   |                     ^^^^^^^^^ expected struct `std::io::Error`, found struct `hyper::error::Error`
   |
   = note: expected enum `std::result::Result<_, std::io::Error>`
              found enum `std::result::Result<_, hyper::error::Error>`
   = note: required for the cast to the object type `(dyn futures_core::stream::Stream<Item = std::result::Result<bytes::bytes::Bytes, std::io::Error>> + std::marker::Send + std::marker::Sync + 'static)`

error: aborting due to previous error; 1 warning emitted

For more information about this error, try `rustc --explain E0271`.
error: could not compile `rustest`.

To learn more, run the command again with --verbose.

问题

将 hyper Body 用作预期 Future Stream 类型的函数参数的最有效方法是什么?

ConsumableStream 期待 Result<Bytes, io::Error>,但 client.request returns 期待 Result<Bytes, hyper::Error>。如果ConsumableStream来自第三方库,你不能改变类型定义,你可以映射结果流:

use futures::TryStreamExt;
use std::io;

#[tokio::main]
async fn main() {
    // ...
    let body = response
        .into_body()
        .map_err(|e| io::Error::new(io::ErrorKind::Other, e));
    stream_consumer(&mut Box::new(body)).await;
}