取消固定 BoxStream 以使用 Tonic gRPC 流发送
Unpinning a BoxStream to send using Tonic gRPC stream
我是 Rust 的新手,正在编写一个简单的应用程序,它将通过 gRPC 流式传输一些值,使用 Tonic. These values are initially acquired from an external library as a BoxStream (Pin<Box<Stream>>
),而 tonic 的 API 需要实现 Stream
(当然 Pin 没有)。
Tonic's streaming example uses a ReceiverStream 将 mpsc 通道转换为流,并分离一个线程以将值推入其中。这将需要 'static
的流生命周期,这不是我实际实现的选项,因为我的流的生命周期与 returns 它的 class 相关联。
提供实现 Stream 的东西的最佳方式是什么,我可以从我的 Pin<Box<Stream>>
提供给 Tonic?
src/main.rs(这不会编译,因为 BoxStream<'static, Entry> 没有实现 IntoStreamingRequest)
use futures::prelude::stream::BoxStream;
use async_stream::stream;
use tonic::{IntoStreamingRequest};
struct Entry {
key: String,
}
fn main() {
// Create Request
let stream: BoxStream<'static, Entry> = api_function();
let request = stream.into_streaming_request();
// Send request
//let mut client = DataImporterClient::connect("http://[::1]:50051").await.unwrap();
//let response = client.grpc_function(request).await?;
}
fn api_function() -> BoxStream<'static, Entry> {
Box::pin(stream! {
let entries = vec!(
Entry {key: String::from("value1")},
Entry {key: String::from("value2")}
);
for entry in entries {
yield entry;
}
})
}
Cargo.toml
[package]
name = "tonic-streaming-minimum-example"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
tonic = "0.5"
futures = "0.3"
tokio-stream = "0.1"
async-stream = "0.3"
提供的编译错误:
error[E0599]: the method `into_streaming_request` exists for struct `Pin<Box<(dyn futures::Stream<Item = Entry> + std::marker::Send + 'static)>>`, but its trait bounds were not satisfied
--> src\main.rs:12:26
|
12 | let request = stream.into_streaming_request();
| ^^^^^^^^^^^^^^^^^^^^^^ method cannot be called on `Pin<Box<(dyn futures::Stream<Item = Entry> + std::marker::Send + 'static)>>` due to unsatisfied trait bounds
|
::: C:\Users\tmathews\.rustup\toolchains\nightly-x86_64-pc-windows-msvc\lib/rustlib/src/rust\library\core\src\pin.rs:408:1
|
408 | pub struct Pin<P> {
| -----------------
| |
| doesn't satisfy `_: IntoStreamingRequest`
| doesn't satisfy `_: Sync`
|
::: C:\Users\tmathews\.cargo\registry\src\github.com-1ecc6299db9ec823\futures-core-0.3.17\src\stream.rs:27:1
|
27 | pub trait Stream {
| ----------------
| |
| doesn't satisfy `_: IntoStreamingRequest`
| doesn't satisfy `_: Sized`
| doesn't satisfy `_: Sync`
|
= note: the following trait bounds were not satisfied:
`Pin<Box<dyn futures::Stream<Item = Entry> + std::marker::Send>>: Sync`
which is required by `Pin<Box<dyn futures::Stream<Item = Entry> + std::marker::Send>>: IntoStreamingRequest`
`&Pin<Box<dyn futures::Stream<Item = Entry> + std::marker::Send>>: futures::Stream`
which is required by `&Pin<Box<dyn futures::Stream<Item = Entry> + std::marker::Send>>: IntoStreamingRequest`
`&Pin<Box<dyn futures::Stream<Item = Entry> + std::marker::Send>>: std::marker::Send`
which is required by `&Pin<Box<dyn futures::Stream<Item = Entry> + std::marker::Send>>: IntoStreamingRequest`
`&Pin<Box<dyn futures::Stream<Item = Entry> + std::marker::Send>>: Sync`
which is required by `&Pin<Box<dyn futures::Stream<Item = Entry> + std::marker::Send>>: IntoStreamingRequest`
`&mut Pin<Box<(dyn futures::Stream<Item = Entry> + std::marker::Send + 'static)>>: Sync`
which is required by `&mut Pin<Box<(dyn futures::Stream<Item = Entry> + std::marker::Send + 'static)>>: IntoStreamingRequest`
`dyn futures::Stream<Item = Entry> + std::marker::Send: Sized`
which is required by `dyn futures::Stream<Item = Entry> + std::marker::Send: IntoStreamingRequest`
`dyn futures::Stream<Item = Entry> + std::marker::Send: Sync`
which is required by `dyn futures::Stream<Item = Entry> + std::marker::Send: IntoStreamingRequest`
`&dyn futures::Stream<Item = Entry> + std::marker::Send: futures::Stream`
which is required by `&dyn futures::Stream<Item = Entry> + std::marker::Send: IntoStreamingRequest`
`&dyn futures::Stream<Item = Entry> + std::marker::Send: std::marker::Send`
which is required by `&dyn futures::Stream<Item = Entry> + std::marker::Send: IntoStreamingRequest`
`&dyn futures::Stream<Item = Entry> + std::marker::Send: Sync`
which is required by `&dyn futures::Stream<Item = Entry> + std::marker::Send: IntoStreamingRequest`
`&mut (dyn futures::Stream<Item = Entry> + std::marker::Send + 'static): futures::Stream`
which is required by `&mut (dyn futures::Stream<Item = Entry> + std::marker::Send + 'static): IntoStreamingRequest`
`&mut (dyn futures::Stream<Item = Entry> + std::marker::Send + 'static): Sync`
which is required by `&mut (dyn futures::Stream<Item = Entry> + std::marker::Send + 'static): IntoStreamingRequest`
问题是 tonic 仅对同时为 Send
和 Sync
的类型实施 IntoStreamingRequest
:
impl<T> IntoStreamingRequest for T
where
T: Stream + Send + Sync + 'static
但是BoxStream
不是:
pub type BoxStream<'a, T> = Pin<Box<dyn Stream<Item = T> + Send + 'a>>;
而不是使用 BoxStream
你应该复制它的定义并添加一个额外的 + Sync
绑定:
fn api_function() -> Pin<Box<dyn Stream<Item = Entry> + Send + Sync + 'static>> {
Box::pin(stream! {
let entries = vec!(
Entry {key: String::from("value1")},
Entry {key: String::from("value2")}
);
for entry in entries {
yield entry;
}
})
}
并且由于 stream!()
宏返回的流已经 Send + Sync
您的代码可以正常编译。
PS:删除不必要的类型提示:
let stream: BoxStream<'static, Entry> = api_function();
// should become:
let stream = api_function(); // after the above change it's not BoxStream anymore!
我是 Rust 的新手,正在编写一个简单的应用程序,它将通过 gRPC 流式传输一些值,使用 Tonic. These values are initially acquired from an external library as a BoxStream (Pin<Box<Stream>>
),而 tonic 的 API 需要实现 Stream
(当然 Pin 没有)。
Tonic's streaming example uses a ReceiverStream 将 mpsc 通道转换为流,并分离一个线程以将值推入其中。这将需要 'static
的流生命周期,这不是我实际实现的选项,因为我的流的生命周期与 returns 它的 class 相关联。
提供实现 Stream 的东西的最佳方式是什么,我可以从我的 Pin<Box<Stream>>
提供给 Tonic?
src/main.rs(这不会编译,因为 BoxStream<'static, Entry> 没有实现 IntoStreamingRequest)
use futures::prelude::stream::BoxStream;
use async_stream::stream;
use tonic::{IntoStreamingRequest};
struct Entry {
key: String,
}
fn main() {
// Create Request
let stream: BoxStream<'static, Entry> = api_function();
let request = stream.into_streaming_request();
// Send request
//let mut client = DataImporterClient::connect("http://[::1]:50051").await.unwrap();
//let response = client.grpc_function(request).await?;
}
fn api_function() -> BoxStream<'static, Entry> {
Box::pin(stream! {
let entries = vec!(
Entry {key: String::from("value1")},
Entry {key: String::from("value2")}
);
for entry in entries {
yield entry;
}
})
}
Cargo.toml
[package]
name = "tonic-streaming-minimum-example"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
tonic = "0.5"
futures = "0.3"
tokio-stream = "0.1"
async-stream = "0.3"
提供的编译错误:
error[E0599]: the method `into_streaming_request` exists for struct `Pin<Box<(dyn futures::Stream<Item = Entry> + std::marker::Send + 'static)>>`, but its trait bounds were not satisfied
--> src\main.rs:12:26
|
12 | let request = stream.into_streaming_request();
| ^^^^^^^^^^^^^^^^^^^^^^ method cannot be called on `Pin<Box<(dyn futures::Stream<Item = Entry> + std::marker::Send + 'static)>>` due to unsatisfied trait bounds
|
::: C:\Users\tmathews\.rustup\toolchains\nightly-x86_64-pc-windows-msvc\lib/rustlib/src/rust\library\core\src\pin.rs:408:1
|
408 | pub struct Pin<P> {
| -----------------
| |
| doesn't satisfy `_: IntoStreamingRequest`
| doesn't satisfy `_: Sync`
|
::: C:\Users\tmathews\.cargo\registry\src\github.com-1ecc6299db9ec823\futures-core-0.3.17\src\stream.rs:27:1
|
27 | pub trait Stream {
| ----------------
| |
| doesn't satisfy `_: IntoStreamingRequest`
| doesn't satisfy `_: Sized`
| doesn't satisfy `_: Sync`
|
= note: the following trait bounds were not satisfied:
`Pin<Box<dyn futures::Stream<Item = Entry> + std::marker::Send>>: Sync`
which is required by `Pin<Box<dyn futures::Stream<Item = Entry> + std::marker::Send>>: IntoStreamingRequest`
`&Pin<Box<dyn futures::Stream<Item = Entry> + std::marker::Send>>: futures::Stream`
which is required by `&Pin<Box<dyn futures::Stream<Item = Entry> + std::marker::Send>>: IntoStreamingRequest`
`&Pin<Box<dyn futures::Stream<Item = Entry> + std::marker::Send>>: std::marker::Send`
which is required by `&Pin<Box<dyn futures::Stream<Item = Entry> + std::marker::Send>>: IntoStreamingRequest`
`&Pin<Box<dyn futures::Stream<Item = Entry> + std::marker::Send>>: Sync`
which is required by `&Pin<Box<dyn futures::Stream<Item = Entry> + std::marker::Send>>: IntoStreamingRequest`
`&mut Pin<Box<(dyn futures::Stream<Item = Entry> + std::marker::Send + 'static)>>: Sync`
which is required by `&mut Pin<Box<(dyn futures::Stream<Item = Entry> + std::marker::Send + 'static)>>: IntoStreamingRequest`
`dyn futures::Stream<Item = Entry> + std::marker::Send: Sized`
which is required by `dyn futures::Stream<Item = Entry> + std::marker::Send: IntoStreamingRequest`
`dyn futures::Stream<Item = Entry> + std::marker::Send: Sync`
which is required by `dyn futures::Stream<Item = Entry> + std::marker::Send: IntoStreamingRequest`
`&dyn futures::Stream<Item = Entry> + std::marker::Send: futures::Stream`
which is required by `&dyn futures::Stream<Item = Entry> + std::marker::Send: IntoStreamingRequest`
`&dyn futures::Stream<Item = Entry> + std::marker::Send: std::marker::Send`
which is required by `&dyn futures::Stream<Item = Entry> + std::marker::Send: IntoStreamingRequest`
`&dyn futures::Stream<Item = Entry> + std::marker::Send: Sync`
which is required by `&dyn futures::Stream<Item = Entry> + std::marker::Send: IntoStreamingRequest`
`&mut (dyn futures::Stream<Item = Entry> + std::marker::Send + 'static): futures::Stream`
which is required by `&mut (dyn futures::Stream<Item = Entry> + std::marker::Send + 'static): IntoStreamingRequest`
`&mut (dyn futures::Stream<Item = Entry> + std::marker::Send + 'static): Sync`
which is required by `&mut (dyn futures::Stream<Item = Entry> + std::marker::Send + 'static): IntoStreamingRequest`
问题是 tonic 仅对同时为 Send
和 Sync
的类型实施 IntoStreamingRequest
:
impl<T> IntoStreamingRequest for T
where
T: Stream + Send + Sync + 'static
但是BoxStream
不是:
pub type BoxStream<'a, T> = Pin<Box<dyn Stream<Item = T> + Send + 'a>>;
而不是使用 BoxStream
你应该复制它的定义并添加一个额外的 + Sync
绑定:
fn api_function() -> Pin<Box<dyn Stream<Item = Entry> + Send + Sync + 'static>> {
Box::pin(stream! {
let entries = vec!(
Entry {key: String::from("value1")},
Entry {key: String::from("value2")}
);
for entry in entries {
yield entry;
}
})
}
并且由于 stream!()
宏返回的流已经 Send + Sync
您的代码可以正常编译。
PS:删除不必要的类型提示:
let stream: BoxStream<'static, Entry> = api_function();
// should become:
let stream = api_function(); // after the above change it's not BoxStream anymore!