使用 hyper/azure sdk 返回流的生命周期问题
Lifetime issue returning a stream with hyper/azure sdk
我一直在尝试 return 流,就像我对 tokio::fs::File 所做的那样,但是我在 BlobClient 上遇到了生命周期错误。
error[E0597]: `blob` does not live long enough
--> src\main.rs:20:27
|
20 | let stream = Box::pin(blob.get().stream(128));
| ^^^^^^^^^^
| |
| borrowed value does not live long enough
| argument requires that `blob` is borrowed for `'static`
...
24 | }
| - `blob` dropped here while still borrowed
我已经尝试了多种处理流的不同方式,但我无法解决此生命周期错误。我敢肯定这可能是我一直忽略的简单事情。感谢您的帮助。
这是我正在尝试做的事情的回购:
use std::{convert::Infallible, net::SocketAddr};
use azure_core::new_http_client;
use azure_storage::{
blob::prelude::{AsBlobClient, AsContainerClient},
clients::{AsStorageClient, StorageAccountClient},
};
use futures::TryStreamExt;
use hyper::{
service::{make_service_fn, service_fn},
Body, Request, Response, Server,
};
async fn handle(_req: Request<Body>) -> Result<Response<Body>, Infallible> {
let http_client = new_http_client();
let storage_account_client = StorageAccountClient::new_access_key(http_client.clone(), "account", "key");
let storage_client = storage_account_client.as_storage_client();
let blob = storage_client.as_container_client("container").as_blob_client("blob");
let stream = Box::pin(blob.get().stream(128));
let s = stream.and_then(|f| futures::future::ok(f.data));
Ok(Response::new(Body::wrap_stream(s)))
}
#[tokio::main]
async fn main() {
let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
let make_service = make_service_fn(|_conn| async { Ok::<_, Infallible>(service_fn(handle)) });
let server = Server::bind(&addr).serve(make_service);
if let Err(e) = server.await {
eprintln!("server error: {}", e);
}
}
问题是流是从blob
借用的,但是wrap_stream()
函数只接受'static
流。解决方法是在新任务中构造流,并通过通道发回流项。以下辅助函数有助于实现此方法:
/// Creates a `'static` stream from a closure returning a (possibly) non-`'static` stream.
///
/// The stream items, closure, and closure argument are still restricted to being `'static`,
/// but the closure can return a non-`'static` stream that borrows from the closure
/// argument.
fn make_static_stream<T, F, U>(
make_stream: F,
mut make_stream_arg: U,
) -> impl Stream<Item = T>
where
T: Send + 'static,
F: FnOnce(&mut U) -> BoxStream<'_, T> + Send + 'static,
U: Send + 'static,
{
let (mut tx, rx) = futures::channel::mpsc::channel(0);
tokio::spawn(async move {
let stream = make_stream(&mut make_stream_arg);
pin_mut!(stream);
while let Some(item) = stream.next().await {
if tx.feed(item).await.is_err() {
// Receiver dropped
break;
}
}
tx.close().await.ok();
});
rx
}
以下是您在原始代码中的使用方式:
// ...
let stream = make_static_stream(
|blob| blob.get().stream(128).map_ok(|x| x.data).boxed(),
blob,
);
Ok(Response::new(Body::wrap_stream(stream)))
}
我一直在尝试 return 流,就像我对 tokio::fs::File 所做的那样,但是我在 BlobClient 上遇到了生命周期错误。
error[E0597]: `blob` does not live long enough
--> src\main.rs:20:27
|
20 | let stream = Box::pin(blob.get().stream(128));
| ^^^^^^^^^^
| |
| borrowed value does not live long enough
| argument requires that `blob` is borrowed for `'static`
...
24 | }
| - `blob` dropped here while still borrowed
我已经尝试了多种处理流的不同方式,但我无法解决此生命周期错误。我敢肯定这可能是我一直忽略的简单事情。感谢您的帮助。
这是我正在尝试做的事情的回购:
use std::{convert::Infallible, net::SocketAddr};
use azure_core::new_http_client;
use azure_storage::{
blob::prelude::{AsBlobClient, AsContainerClient},
clients::{AsStorageClient, StorageAccountClient},
};
use futures::TryStreamExt;
use hyper::{
service::{make_service_fn, service_fn},
Body, Request, Response, Server,
};
async fn handle(_req: Request<Body>) -> Result<Response<Body>, Infallible> {
let http_client = new_http_client();
let storage_account_client = StorageAccountClient::new_access_key(http_client.clone(), "account", "key");
let storage_client = storage_account_client.as_storage_client();
let blob = storage_client.as_container_client("container").as_blob_client("blob");
let stream = Box::pin(blob.get().stream(128));
let s = stream.and_then(|f| futures::future::ok(f.data));
Ok(Response::new(Body::wrap_stream(s)))
}
#[tokio::main]
async fn main() {
let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
let make_service = make_service_fn(|_conn| async { Ok::<_, Infallible>(service_fn(handle)) });
let server = Server::bind(&addr).serve(make_service);
if let Err(e) = server.await {
eprintln!("server error: {}", e);
}
}
问题是流是从blob
借用的,但是wrap_stream()
函数只接受'static
流。解决方法是在新任务中构造流,并通过通道发回流项。以下辅助函数有助于实现此方法:
/// Creates a `'static` stream from a closure returning a (possibly) non-`'static` stream.
///
/// The stream items, closure, and closure argument are still restricted to being `'static`,
/// but the closure can return a non-`'static` stream that borrows from the closure
/// argument.
fn make_static_stream<T, F, U>(
make_stream: F,
mut make_stream_arg: U,
) -> impl Stream<Item = T>
where
T: Send + 'static,
F: FnOnce(&mut U) -> BoxStream<'_, T> + Send + 'static,
U: Send + 'static,
{
let (mut tx, rx) = futures::channel::mpsc::channel(0);
tokio::spawn(async move {
let stream = make_stream(&mut make_stream_arg);
pin_mut!(stream);
while let Some(item) = stream.next().await {
if tx.feed(item).await.is_err() {
// Receiver dropped
break;
}
}
tx.close().await.ok();
});
rx
}
以下是您在原始代码中的使用方式:
// ...
let stream = make_static_stream(
|blob| blob.get().stream(128).map_ok(|x| x.data).boxed(),
blob,
);
Ok(Response::new(Body::wrap_stream(stream)))
}