取消固定 BoxStream 以使用 Tonic gRPC 流发送

Unpinning a BoxStream to send using Tonic gRPC stream

我是 Rust 的新手,正在编写一个简单的应用程序,它将通过 gRPC 流式传输一些值,使用 Tonic. These values are initially acquired from an external library as a BoxStream (Pin<Box<Stream>>),而 tonic 的 API 需要实现 Stream(当然 Pin 没有)。

Tonic's streaming example uses a ReceiverStream 将 mpsc 通道转换为流,并分离一个线程以将值推入其中。这将需要 'static 的流生命周期,这不是我实际实现的选项,因为我的流的生命周期与 returns 它的 class 相关联。

提供实现 Stream 的东西的最佳方式是什么,我可以从我的 Pin<Box<Stream>> 提供给 Tonic?

src/main.rs(这不会编译,因为 BoxStream<'static, Entry> 没有实现 IntoStreamingRequest)

use futures::prelude::stream::BoxStream;
use async_stream::stream;
use tonic::{IntoStreamingRequest};

struct Entry {
    key: String,
}

fn main() {
    // Create Request
    let stream: BoxStream<'static, Entry> = api_function();
    let request = stream.into_streaming_request();

    // Send request
    //let mut client = DataImporterClient::connect("http://[::1]:50051").await.unwrap();
    //let response = client.grpc_function(request).await?;
}

fn api_function() -> BoxStream<'static, Entry> {
    Box::pin(stream! {
        let entries = vec!(
            Entry {key: String::from("value1")},
            Entry {key: String::from("value2")}
        );

        for entry in entries {
            yield entry;
        }
    })
}

Cargo.toml

[package]
name = "tonic-streaming-minimum-example"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
tonic = "0.5"
futures = "0.3"
tokio-stream = "0.1"
async-stream = "0.3"

提供的编译错误:

error[E0599]: the method `into_streaming_request` exists for struct `Pin<Box<(dyn futures::Stream<Item = Entry> + std::marker::Send + 'static)>>`, but its trait bounds were not satisfied
   --> src\main.rs:12:26
    |
12  |     let request = stream.into_streaming_request();
    |                          ^^^^^^^^^^^^^^^^^^^^^^ method cannot be called on `Pin<Box<(dyn futures::Stream<Item = Entry> + std::marker::Send + 'static)>>` due to unsatisfied trait bounds
    |
   ::: C:\Users\tmathews\.rustup\toolchains\nightly-x86_64-pc-windows-msvc\lib/rustlib/src/rust\library\core\src\pin.rs:408:1
    |
408 | pub struct Pin<P> {
    | -----------------
    | |
    | doesn't satisfy `_: IntoStreamingRequest`
    | doesn't satisfy `_: Sync`
    |
   ::: C:\Users\tmathews\.cargo\registry\src\github.com-1ecc6299db9ec823\futures-core-0.3.17\src\stream.rs:27:1
    |
27  | pub trait Stream {
    | ----------------
    | |
    | doesn't satisfy `_: IntoStreamingRequest`
    | doesn't satisfy `_: Sized`
    | doesn't satisfy `_: Sync`
    |
    = note: the following trait bounds were not satisfied:
            `Pin<Box<dyn futures::Stream<Item = Entry> + std::marker::Send>>: Sync`
            which is required by `Pin<Box<dyn futures::Stream<Item = Entry> + std::marker::Send>>: IntoStreamingRequest`
            `&Pin<Box<dyn futures::Stream<Item = Entry> + std::marker::Send>>: futures::Stream`
            which is required by `&Pin<Box<dyn futures::Stream<Item = Entry> + std::marker::Send>>: IntoStreamingRequest`
            `&Pin<Box<dyn futures::Stream<Item = Entry> + std::marker::Send>>: std::marker::Send`
            which is required by `&Pin<Box<dyn futures::Stream<Item = Entry> + std::marker::Send>>: IntoStreamingRequest`
            `&Pin<Box<dyn futures::Stream<Item = Entry> + std::marker::Send>>: Sync`
            which is required by `&Pin<Box<dyn futures::Stream<Item = Entry> + std::marker::Send>>: IntoStreamingRequest`
            `&mut Pin<Box<(dyn futures::Stream<Item = Entry> + std::marker::Send + 'static)>>: Sync`
            which is required by `&mut Pin<Box<(dyn futures::Stream<Item = Entry> + std::marker::Send + 'static)>>: IntoStreamingRequest`
            `dyn futures::Stream<Item = Entry> + std::marker::Send: Sized`
            which is required by `dyn futures::Stream<Item = Entry> + std::marker::Send: IntoStreamingRequest`
            `dyn futures::Stream<Item = Entry> + std::marker::Send: Sync`
            which is required by `dyn futures::Stream<Item = Entry> + std::marker::Send: IntoStreamingRequest`
            `&dyn futures::Stream<Item = Entry> + std::marker::Send: futures::Stream`
            which is required by `&dyn futures::Stream<Item = Entry> + std::marker::Send: IntoStreamingRequest`
            `&dyn futures::Stream<Item = Entry> + std::marker::Send: std::marker::Send`
            which is required by `&dyn futures::Stream<Item = Entry> + std::marker::Send: IntoStreamingRequest`
            `&dyn futures::Stream<Item = Entry> + std::marker::Send: Sync`
            which is required by `&dyn futures::Stream<Item = Entry> + std::marker::Send: IntoStreamingRequest`
            `&mut (dyn futures::Stream<Item = Entry> + std::marker::Send + 'static): futures::Stream`
            which is required by `&mut (dyn futures::Stream<Item = Entry> + std::marker::Send + 'static): IntoStreamingRequest`
            `&mut (dyn futures::Stream<Item = Entry> + std::marker::Send + 'static): Sync`
            which is required by `&mut (dyn futures::Stream<Item = Entry> + std::marker::Send + 'static): IntoStreamingRequest`


问题是 tonic 仅对同时为 SendSync 的类型实施 IntoStreamingRequest:

impl<T> IntoStreamingRequest for T
where
    T: Stream + Send + Sync + 'static

但是BoxStream不是:

pub type BoxStream<'a, T> = Pin<Box<dyn Stream<Item = T> + Send + 'a>>;

而不是使用 BoxStream 你应该复制它的定义并添加一个额外的 + Sync 绑定:

fn api_function() -> Pin<Box<dyn Stream<Item = Entry> + Send + Sync + 'static>> {
    Box::pin(stream! {
        let entries = vec!(
            Entry {key: String::from("value1")},
            Entry {key: String::from("value2")}
        );

        for entry in entries {
            yield entry;
        }
    })
}

并且由于 stream!() 宏返回的流已经 Send + Sync 您的代码可以正常编译。

PS:删除不必要的类型提示:

    let stream: BoxStream<'static, Entry> = api_function();
// should become:
    let stream = api_function(); // after the above change it's not BoxStream anymore!