为 tonic::Streaming 实现 AsyncRead

impl AsyncRead for tonic::Streaming

我正在尝试使用 tonic routeguide tutorial, and turn the client into a rocket 服务器。我只是获取响应并将其从 gRPC 转换为字符串。

service RouteGuide {
    rpc GetFeature(Point) returns (Feature) {}
    rpc ListFeatures(Rectangle) returns (stream Feature) {}
}

这对 GetFeature 来说效果很好。对于 ListFeatures 查询,就像 Tonic 允许客户端响应中的流一样,我想将其传递给 Rocket 客户端。我看到 Rocket 支持 streaming responses, but I need to implement the AsyncRead 特性。

有什么办法可以做这样的事情吗?以下是我所做工作的精简版:

struct FeatureStream {
    stream: tonic::Streaming<Feature>,
}

impl AsyncRead for FeatureStream {
    fn poll_read(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut ReadBuf<'_>,
    ) -> Poll<std::io::Result<()>> {
        // Write out as utf8 any response messages.
        match Pin::new(&mut self.stream.message()).poll(cx) {
            Poll::Pending => Poll::Pending,
            Poll::Ready(feature) => Poll::Pending,
        }
    }
}

#[get("/list_features")]
async fn list_features(client: State<'_, RouteGuideClient<Channel>>) -> Stream<FeatureStream> {
    let rectangle = Rectangle {
        low: Some(Point {
            latitude: 400_000_000,
            longitude: -750_000_000,
        }),
        high: Some(Point {
            latitude: 420_000_000,
            longitude: -730_000_000,
        }),
    };
    let mut client = client.inner().clone();
    let stream = client
        .list_features(Request::new(rectangle))
        .await
        .unwrap()
        .into_inner();
    Stream::from(FeatureStream { stream })
}

#[rocket::launch]
async fn rocket() -> rocket::Rocket {
    rocket::ignite()
        .manage(
            create_route_guide_client("http://[::1]:10000")
                .await
                .unwrap(),
        )
        .mount("/", rocket::routes![list_features,])
}

出现错误:

error[E0277]: `from_generator::GenFuture<[static generator@Streaming<Feature>::message::{closure#0} for<'r, 's, 't0, 't1, 't2> {ResumeTy, &'r mut Streaming<Feature>, [closure@Streaming<Feature>::message::{closure#0}::{closure#0}], rocket::futures::future::PollFn<[closure@Streaming<Feature>::message::{closure#0}::{closure#0}]>, ()}]>` cannot be unpinned
   --> src/web_user.rs:34:15
    |
34  |         match Pin::new(&mut self.stream.message()).poll(cx) {
    |               ^^^^^^^^ within `impl std::future::Future`, the trait `Unpin` is not implemented for `from_generator::GenFuture<[static generator@Streaming<Feature>::message::{closure#0} for<'r, 's, 't0, 't1, 't2> {ResumeTy, &'r mut Streaming<Feature>, [closure@Streaming<Feature>::message::{closure#0}::{closure#0}], rocket::futures::future::PollFn<[closure@Streaming<Feature>::message::{closure#0}::{closure#0}]>, ()}]>`
    | 
   ::: /home/matan/.cargo/registry/src/github.com-1ecc6299db9ec823/tonic-0.4.0/src/codec/decode.rs:106:40
    |
106 |     pub async fn message(&mut self) -> Result<Option<T>, Status> {
    |                                        ------------------------- within this `impl std::future::Future`
    |
    = note: required because it appears within the type `impl std::future::Future`
    = note: required because it appears within the type `impl std::future::Future`
    = note: required by `Pin::<P>::new`

感谢 Omer Erden 回答这个问题。所以它归结为基于 futures::Stream 特征实现 AsyncRead,tonic::Streaming 实现。这是我实际使用的代码。

impl AsyncRead for FeatureStream {
    fn poll_read(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut ReadBuf<'_>,
    ) -> Poll<std::io::Result<()>> {
        use futures::stream::StreamExt;
        use std::io::{Error, ErrorKind};

        match self.stream.poll_next_unpin(cx) {
            Poll::Ready(Some(Ok(m))) => {
                buf.put_slice(format!("{:?}\n", m).as_bytes());
                Poll::Ready(Ok(()))
            }
            Poll::Ready(Some(Err(e))) => {
                Poll::Ready(Err(Error::new(ErrorKind::Other, format!("{:?}", e))))
            }
            Poll::Ready(None) => {
                // None from a stream means the stream terminated. To indicate
                // that from AsyncRead we return Ok and leave buf unchanged.
                Poll::Ready(Ok(()))
            }
            Poll::Pending => Poll::Pending,
        }
    }
}

与此同时,我的解决方法是创建 TcpStream(实现 AsyncRead)的两端和 return 它的一端,同时产生一个单独的任务来实际写出结果。

#[get("/list_features")]
async fn list_features(
    client: State<'_, RouteGuideClient<Channel>>,
    tasks: State<'_, Mutex<Vec<tokio::task::JoinHandle<()>>>>,
) -> Result<Stream<TcpStream>, Debug<std::io::Error>> {
    let mut client = client.inner().clone();
    let mut feature_stream = client
        .list_features(Request::new(Rectangle {
            low: Some(Point {
                latitude: 400000000,
                longitude: -750000000,
            }),
            high: Some(Point {
                latitude: 420000000,
                longitude: -730000000,
            }),
        }))
        .await
        .unwrap()
        .into_inner();

    // Port 0 tells to operating system to choose an unused port.
    let tcp_listener = TcpListener::bind(("127.0.0.1", 0)).await?;
    let socket_addr = tcp_listener.local_addr().unwrap();
    tasks.lock().unwrap().push(tokio::spawn(async move {
        let mut tcp_stream = TcpStream::connect(socket_addr).await.unwrap();

        while let Some(feature) = feature_stream.message().await.unwrap() {
            match tcp_stream
                .write_all(format!("{:?}\n", feature).as_bytes())
                .await
            {
                Ok(()) => (),
                Err(e) => panic!(e),
            }
        }
        println!("End task");
        ()
    }));
    Ok(Stream::from(tcp_listener.accept().await?.0))
}

问题是从 tonic::Streaming<Feature>::message() 生成的 Future 没有实现 Unpin 因为它是一个 async 函数。让我们将此类型标记为 MessageFuture,您无法安全地 固定 &mut MessageFuture 指针 ,因为取消引用的类型 MessageFuture 未实现 Unpin

为什么不安全?

来自 referenceUnpin 的实施带来:

Types that can be safely moved after being pinned.

这意味着如果 T:!Unpin 那么 Pin<&mut T> 是不可移动的,这很重要,因为异步块创建的 Futures 没有 Unpin 实现,因为它可能持有引用一个成员本身,如果你移动 T 这个引用的指针也将被移动,但引用仍然指向相同的地址,为了防止它不应该是可移动的。请阅读 "Pinning" section from async-book 以形象化原因。

注意: T:!Unpin 表示 T 是没有 Unpin 实现的类型。

解决方案

message() 函数是从 tonic::Streaming<T> 中选择下一条消息的助手。您不需要特别调用 message() 来从流中选择下一个元素,您的结构中已经有了实际的流。

struct FeatureStream {stream: tonic::Streaming<Feature>}

您可以等待 AsyncRead 的下一条消息,例如:

impl AsyncRead for FeatureStream {
    fn poll_read(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut ReadBuf<'_>,
    ) -> Poll<std::io::Result<()>> {

       //it returns Pending for all cases as your code does, you can change it as you want
        match self.stream.poll_next_unpin(cx) {
            Poll::Ready(Some(Ok(m))) => Poll::Pending,
            Poll::Ready(Some(Err(e))) => Poll::Pending,
            Poll::Ready(None) => Poll::Pending,
            Poll::Pending => Poll::Pending
        }
    }
}

请注意 tonic::Streaming<T> 实现了 Unpin(reference)