为 tonic::Streaming 实现 AsyncRead
impl AsyncRead for tonic::Streaming
我正在尝试使用 tonic routeguide tutorial, and turn the client into a rocket 服务器。我只是获取响应并将其从 gRPC 转换为字符串。
service RouteGuide {
rpc GetFeature(Point) returns (Feature) {}
rpc ListFeatures(Rectangle) returns (stream Feature) {}
}
这对 GetFeature 来说效果很好。对于 ListFeatures 查询,就像 Tonic 允许客户端响应中的流一样,我想将其传递给 Rocket 客户端。我看到 Rocket 支持 streaming responses, but I need to implement the AsyncRead 特性。
有什么办法可以做这样的事情吗?以下是我所做工作的精简版:
struct FeatureStream {
stream: tonic::Streaming<Feature>,
}
impl AsyncRead for FeatureStream {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
// Write out as utf8 any response messages.
match Pin::new(&mut self.stream.message()).poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(feature) => Poll::Pending,
}
}
}
#[get("/list_features")]
async fn list_features(client: State<'_, RouteGuideClient<Channel>>) -> Stream<FeatureStream> {
let rectangle = Rectangle {
low: Some(Point {
latitude: 400_000_000,
longitude: -750_000_000,
}),
high: Some(Point {
latitude: 420_000_000,
longitude: -730_000_000,
}),
};
let mut client = client.inner().clone();
let stream = client
.list_features(Request::new(rectangle))
.await
.unwrap()
.into_inner();
Stream::from(FeatureStream { stream })
}
#[rocket::launch]
async fn rocket() -> rocket::Rocket {
rocket::ignite()
.manage(
create_route_guide_client("http://[::1]:10000")
.await
.unwrap(),
)
.mount("/", rocket::routes![list_features,])
}
出现错误:
error[E0277]: `from_generator::GenFuture<[static generator@Streaming<Feature>::message::{closure#0} for<'r, 's, 't0, 't1, 't2> {ResumeTy, &'r mut Streaming<Feature>, [closure@Streaming<Feature>::message::{closure#0}::{closure#0}], rocket::futures::future::PollFn<[closure@Streaming<Feature>::message::{closure#0}::{closure#0}]>, ()}]>` cannot be unpinned
--> src/web_user.rs:34:15
|
34 | match Pin::new(&mut self.stream.message()).poll(cx) {
| ^^^^^^^^ within `impl std::future::Future`, the trait `Unpin` is not implemented for `from_generator::GenFuture<[static generator@Streaming<Feature>::message::{closure#0} for<'r, 's, 't0, 't1, 't2> {ResumeTy, &'r mut Streaming<Feature>, [closure@Streaming<Feature>::message::{closure#0}::{closure#0}], rocket::futures::future::PollFn<[closure@Streaming<Feature>::message::{closure#0}::{closure#0}]>, ()}]>`
|
::: /home/matan/.cargo/registry/src/github.com-1ecc6299db9ec823/tonic-0.4.0/src/codec/decode.rs:106:40
|
106 | pub async fn message(&mut self) -> Result<Option<T>, Status> {
| ------------------------- within this `impl std::future::Future`
|
= note: required because it appears within the type `impl std::future::Future`
= note: required because it appears within the type `impl std::future::Future`
= note: required by `Pin::<P>::new`
感谢 Omer Erden 回答这个问题。所以它归结为基于 futures::Stream 特征实现 AsyncRead,tonic::Streaming 实现。这是我实际使用的代码。
impl AsyncRead for FeatureStream {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
use futures::stream::StreamExt;
use std::io::{Error, ErrorKind};
match self.stream.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(m))) => {
buf.put_slice(format!("{:?}\n", m).as_bytes());
Poll::Ready(Ok(()))
}
Poll::Ready(Some(Err(e))) => {
Poll::Ready(Err(Error::new(ErrorKind::Other, format!("{:?}", e))))
}
Poll::Ready(None) => {
// None from a stream means the stream terminated. To indicate
// that from AsyncRead we return Ok and leave buf unchanged.
Poll::Ready(Ok(()))
}
Poll::Pending => Poll::Pending,
}
}
}
与此同时,我的解决方法是创建 TcpStream(实现 AsyncRead)的两端和 return 它的一端,同时产生一个单独的任务来实际写出结果。
#[get("/list_features")]
async fn list_features(
client: State<'_, RouteGuideClient<Channel>>,
tasks: State<'_, Mutex<Vec<tokio::task::JoinHandle<()>>>>,
) -> Result<Stream<TcpStream>, Debug<std::io::Error>> {
let mut client = client.inner().clone();
let mut feature_stream = client
.list_features(Request::new(Rectangle {
low: Some(Point {
latitude: 400000000,
longitude: -750000000,
}),
high: Some(Point {
latitude: 420000000,
longitude: -730000000,
}),
}))
.await
.unwrap()
.into_inner();
// Port 0 tells to operating system to choose an unused port.
let tcp_listener = TcpListener::bind(("127.0.0.1", 0)).await?;
let socket_addr = tcp_listener.local_addr().unwrap();
tasks.lock().unwrap().push(tokio::spawn(async move {
let mut tcp_stream = TcpStream::connect(socket_addr).await.unwrap();
while let Some(feature) = feature_stream.message().await.unwrap() {
match tcp_stream
.write_all(format!("{:?}\n", feature).as_bytes())
.await
{
Ok(()) => (),
Err(e) => panic!(e),
}
}
println!("End task");
()
}));
Ok(Stream::from(tcp_listener.accept().await?.0))
}
问题是从 tonic::Streaming<Feature>::message()
生成的 Future
没有实现 Unpin
因为它是一个 async 函数。让我们将此类型标记为 MessageFuture
,您无法安全地 固定 &mut MessageFuture
指针 ,因为取消引用的类型 MessageFuture
未实现 Unpin
。
为什么不安全?
来自 reference,Unpin
的实施带来:
Types that can be safely moved after being pinned.
这意味着如果 T:!Unpin
那么 Pin<&mut T>
是不可移动的,这很重要,因为异步块创建的 Future
s 没有 Unpin
实现,因为它可能持有引用一个成员本身,如果你移动 T
这个引用的指针也将被移动,但引用仍然指向相同的地址,为了防止它不应该是可移动的。请阅读 "Pinning" section from async-book 以形象化原因。
注意: T:!Unpin
表示 T
是没有 Unpin
实现的类型。
解决方案
message()
函数是从 tonic::Streaming<T>
中选择下一条消息的助手。您不需要特别调用 message()
来从流中选择下一个元素,您的结构中已经有了实际的流。
struct FeatureStream {stream: tonic::Streaming<Feature>}
您可以等待 AsyncRead
的下一条消息,例如:
impl AsyncRead for FeatureStream {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
//it returns Pending for all cases as your code does, you can change it as you want
match self.stream.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(m))) => Poll::Pending,
Poll::Ready(Some(Err(e))) => Poll::Pending,
Poll::Ready(None) => Poll::Pending,
Poll::Pending => Poll::Pending
}
}
}
请注意 tonic::Streaming<T>
实现了 Unpin
(reference)
我正在尝试使用 tonic routeguide tutorial, and turn the client into a rocket 服务器。我只是获取响应并将其从 gRPC 转换为字符串。
service RouteGuide {
rpc GetFeature(Point) returns (Feature) {}
rpc ListFeatures(Rectangle) returns (stream Feature) {}
}
这对 GetFeature 来说效果很好。对于 ListFeatures 查询,就像 Tonic 允许客户端响应中的流一样,我想将其传递给 Rocket 客户端。我看到 Rocket 支持 streaming responses, but I need to implement the AsyncRead 特性。
有什么办法可以做这样的事情吗?以下是我所做工作的精简版:
struct FeatureStream {
stream: tonic::Streaming<Feature>,
}
impl AsyncRead for FeatureStream {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
// Write out as utf8 any response messages.
match Pin::new(&mut self.stream.message()).poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(feature) => Poll::Pending,
}
}
}
#[get("/list_features")]
async fn list_features(client: State<'_, RouteGuideClient<Channel>>) -> Stream<FeatureStream> {
let rectangle = Rectangle {
low: Some(Point {
latitude: 400_000_000,
longitude: -750_000_000,
}),
high: Some(Point {
latitude: 420_000_000,
longitude: -730_000_000,
}),
};
let mut client = client.inner().clone();
let stream = client
.list_features(Request::new(rectangle))
.await
.unwrap()
.into_inner();
Stream::from(FeatureStream { stream })
}
#[rocket::launch]
async fn rocket() -> rocket::Rocket {
rocket::ignite()
.manage(
create_route_guide_client("http://[::1]:10000")
.await
.unwrap(),
)
.mount("/", rocket::routes![list_features,])
}
出现错误:
error[E0277]: `from_generator::GenFuture<[static generator@Streaming<Feature>::message::{closure#0} for<'r, 's, 't0, 't1, 't2> {ResumeTy, &'r mut Streaming<Feature>, [closure@Streaming<Feature>::message::{closure#0}::{closure#0}], rocket::futures::future::PollFn<[closure@Streaming<Feature>::message::{closure#0}::{closure#0}]>, ()}]>` cannot be unpinned
--> src/web_user.rs:34:15
|
34 | match Pin::new(&mut self.stream.message()).poll(cx) {
| ^^^^^^^^ within `impl std::future::Future`, the trait `Unpin` is not implemented for `from_generator::GenFuture<[static generator@Streaming<Feature>::message::{closure#0} for<'r, 's, 't0, 't1, 't2> {ResumeTy, &'r mut Streaming<Feature>, [closure@Streaming<Feature>::message::{closure#0}::{closure#0}], rocket::futures::future::PollFn<[closure@Streaming<Feature>::message::{closure#0}::{closure#0}]>, ()}]>`
|
::: /home/matan/.cargo/registry/src/github.com-1ecc6299db9ec823/tonic-0.4.0/src/codec/decode.rs:106:40
|
106 | pub async fn message(&mut self) -> Result<Option<T>, Status> {
| ------------------------- within this `impl std::future::Future`
|
= note: required because it appears within the type `impl std::future::Future`
= note: required because it appears within the type `impl std::future::Future`
= note: required by `Pin::<P>::new`
感谢 Omer Erden 回答这个问题。所以它归结为基于 futures::Stream 特征实现 AsyncRead,tonic::Streaming 实现。这是我实际使用的代码。
impl AsyncRead for FeatureStream {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
use futures::stream::StreamExt;
use std::io::{Error, ErrorKind};
match self.stream.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(m))) => {
buf.put_slice(format!("{:?}\n", m).as_bytes());
Poll::Ready(Ok(()))
}
Poll::Ready(Some(Err(e))) => {
Poll::Ready(Err(Error::new(ErrorKind::Other, format!("{:?}", e))))
}
Poll::Ready(None) => {
// None from a stream means the stream terminated. To indicate
// that from AsyncRead we return Ok and leave buf unchanged.
Poll::Ready(Ok(()))
}
Poll::Pending => Poll::Pending,
}
}
}
与此同时,我的解决方法是创建 TcpStream(实现 AsyncRead)的两端和 return 它的一端,同时产生一个单独的任务来实际写出结果。
#[get("/list_features")]
async fn list_features(
client: State<'_, RouteGuideClient<Channel>>,
tasks: State<'_, Mutex<Vec<tokio::task::JoinHandle<()>>>>,
) -> Result<Stream<TcpStream>, Debug<std::io::Error>> {
let mut client = client.inner().clone();
let mut feature_stream = client
.list_features(Request::new(Rectangle {
low: Some(Point {
latitude: 400000000,
longitude: -750000000,
}),
high: Some(Point {
latitude: 420000000,
longitude: -730000000,
}),
}))
.await
.unwrap()
.into_inner();
// Port 0 tells to operating system to choose an unused port.
let tcp_listener = TcpListener::bind(("127.0.0.1", 0)).await?;
let socket_addr = tcp_listener.local_addr().unwrap();
tasks.lock().unwrap().push(tokio::spawn(async move {
let mut tcp_stream = TcpStream::connect(socket_addr).await.unwrap();
while let Some(feature) = feature_stream.message().await.unwrap() {
match tcp_stream
.write_all(format!("{:?}\n", feature).as_bytes())
.await
{
Ok(()) => (),
Err(e) => panic!(e),
}
}
println!("End task");
()
}));
Ok(Stream::from(tcp_listener.accept().await?.0))
}
问题是从 tonic::Streaming<Feature>::message()
生成的 Future
没有实现 Unpin
因为它是一个 async 函数。让我们将此类型标记为 MessageFuture
,您无法安全地 固定 &mut MessageFuture
指针 ,因为取消引用的类型 MessageFuture
未实现 Unpin
。
为什么不安全?
来自 reference,Unpin
的实施带来:
Types that can be safely moved after being pinned.
这意味着如果 T:!Unpin
那么 Pin<&mut T>
是不可移动的,这很重要,因为异步块创建的 Future
s 没有 Unpin
实现,因为它可能持有引用一个成员本身,如果你移动 T
这个引用的指针也将被移动,但引用仍然指向相同的地址,为了防止它不应该是可移动的。请阅读 "Pinning" section from async-book 以形象化原因。
注意: T:!Unpin
表示 T
是没有 Unpin
实现的类型。
解决方案
message()
函数是从 tonic::Streaming<T>
中选择下一条消息的助手。您不需要特别调用 message()
来从流中选择下一个元素,您的结构中已经有了实际的流。
struct FeatureStream {stream: tonic::Streaming<Feature>}
您可以等待 AsyncRead
的下一条消息,例如:
impl AsyncRead for FeatureStream {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
//it returns Pending for all cases as your code does, you can change it as you want
match self.stream.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(m))) => Poll::Pending,
Poll::Ready(Some(Err(e))) => Poll::Pending,
Poll::Ready(None) => Poll::Pending,
Poll::Pending => Poll::Pending
}
}
}
请注意 tonic::Streaming<T>
实现了 Unpin
(reference)