使用 hyper/azure sdk 返回流的生命周期问题

Lifetime issue returning a stream with hyper/azure sdk

我一直在尝试 return 流,就像我对 tokio::fs::File 所做的那样,但是我在 BlobClient 上遇到了生命周期错误。

error[E0597]: `blob` does not live long enough
  --> src\main.rs:20:27
   |
20 |     let stream = Box::pin(blob.get().stream(128));
   |                           ^^^^^^^^^^
   |                           |
   |                           borrowed value does not live long enough
   |                           argument requires that `blob` is borrowed for `'static`
...
24 | }
   | - `blob` dropped here while still borrowed

我已经尝试了多种处理流的不同方式,但我无法解决此生命周期错误。我敢肯定这可能是我一直忽略的简单事情。感谢您的帮助。

这是我正在尝试做的事情的回购:

use std::{convert::Infallible, net::SocketAddr};

use azure_core::new_http_client;
use azure_storage::{
    blob::prelude::{AsBlobClient, AsContainerClient},
    clients::{AsStorageClient, StorageAccountClient},
};
use futures::TryStreamExt;
use hyper::{
    service::{make_service_fn, service_fn},
    Body, Request, Response, Server,
};

async fn handle(_req: Request<Body>) -> Result<Response<Body>, Infallible> {
    let http_client = new_http_client();
    let storage_account_client = StorageAccountClient::new_access_key(http_client.clone(), "account", "key");
    let storage_client = storage_account_client.as_storage_client();
    let blob = storage_client.as_container_client("container").as_blob_client("blob");

    let stream = Box::pin(blob.get().stream(128));
    let s = stream.and_then(|f| futures::future::ok(f.data));

    Ok(Response::new(Body::wrap_stream(s)))
}

#[tokio::main]
async fn main() {
    let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
    let make_service = make_service_fn(|_conn| async { Ok::<_, Infallible>(service_fn(handle)) });

    let server = Server::bind(&addr).serve(make_service);

    if let Err(e) = server.await {
        eprintln!("server error: {}", e);
    }
}

问题是流是从blob借用的,但是wrap_stream()函数只接受'static流。解决方法是在新任务中构造流,并通过通道发回流项。以下辅助函数有助于实现此方法:

/// Creates a `'static` stream from a closure returning a (possibly) non-`'static` stream.
///
/// The stream items, closure, and closure argument are still restricted to being `'static`,
/// but the closure can return a non-`'static` stream that borrows from the closure
/// argument.
fn make_static_stream<T, F, U>(
    make_stream: F,
    mut make_stream_arg: U,
) -> impl Stream<Item = T>
where
    T: Send + 'static,
    F: FnOnce(&mut U) -> BoxStream<'_, T> + Send + 'static,
    U: Send + 'static,
{
    let (mut tx, rx) = futures::channel::mpsc::channel(0);
    tokio::spawn(async move {
        let stream = make_stream(&mut make_stream_arg);
        pin_mut!(stream);
        while let Some(item) = stream.next().await {
            if tx.feed(item).await.is_err() {
                // Receiver dropped
                break;
            }
        }

        tx.close().await.ok();
    });

    rx
}

以下是您在原始代码中的使用方式:

    // ...

    let stream = make_static_stream(
        |blob| blob.get().stream(128).map_ok(|x| x.data).boxed(),
        blob,
    );

    Ok(Response::new(Body::wrap_stream(stream)))
}