在未来的实现中手动轮询流
Manually polling streams in future implementation
我正在迁移到 futures
0.3 和 tokio
0.2,并且有一个重复出现的模式我无法重新使用。我不确定这种模式是否已经过时,或者我是否对 Pin
.
做错了什么
通常我有一种类型,它包含一个套接字和几个通道接收器。此类结构的 Future
实现包括重复轮询流,直到它们 return Pending
(NotReady
在 0.1 生态系统中)。
然而,在 futures 0.3 中,Future::poll
和 Stream::poll_next
使用 self
而不是 &mut self
,并且这种模式不再有效:
use futures::{
stream::Stream,
task::{Context, Poll},
Future,
};
use std::pin::Pin;
use tokio::sync::mpsc::{Receiver, Sender};
/// Dummy structure that represent some state we update when we
/// receive data or events.
struct State;
impl State {
fn update(&mut self, _data: Vec<u8>) {
println!("updated state");
}
fn handle_event(&mut self, _event: u32) {
println!("handled event");
}
}
/// The future I want to implement.
struct MyFuture {
state: State,
data: Receiver<Vec<u8>>,
events: Receiver<Vec<u8>>,
}
impl MyFuture {
fn poll_data(self: Pin<&mut Self>, cx: &mut Context) -> Poll<()> {
use Poll::*;
let MyFuture {
ref mut data,
ref mut state,
..
} = self.get_mut();
loop {
// this breaks, because Pin::new consume the mutable
// reference on the first iteration of the loop.
match Pin::new(data).poll_next(cx) {
Ready(Some(vec)) => state.update(vec),
Ready(None) => return Ready(()),
Pending => return Pending,
}
}
}
// unimplemented, but we basically have the same problem than with
// `poll_data()`
fn poll_events(self: Pin<&mut Self>, cx: &mut Context) -> Poll<()> {
unimplemented!()
}
}
impl Future for MyFuture {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
use Poll::*;
if let Ready(_) = self.poll_data(cx) {
return Ready(());
}
// This does not work because self was consumed when
// self.poll_data() was called.
if let Ready(_) = self.poll_events(cx) {
return Ready(());
}
return Pending;
}
}
有没有办法修复该代码?如果不是,我可以使用什么模式来实现相同的逻辑?
您可以使用 Pin::as_mut
来避免消耗 Pin
。
impl MyFuture {
fn poll_data(self: Pin<&mut Self>, cx: &mut Context) -> Poll<()> {
use Poll::*;
let MyFuture {
ref mut data,
ref mut state,
..
} = self.get_mut();
let mut data = Pin::new(data); // Move pin here
loop {
match data.as_mut().poll_next(cx) { // Use in loop by calling `as_mut()`
Ready(Some(vec)) => state.update(vec),
Ready(None) => return Ready(()),
Pending => return Pending,
}
}
}
}
并在未来实现:
impl Future for MyFuture {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
use Poll::*;
// `as_mut()` here to avoid consuming
if let Ready(_) = self.as_mut().poll_data(cx) {
return Ready(());
}
// can consume here as this is the last invocation
if let Ready(_) = self.poll_events(cx) {
return Ready(());
}
return Pending;
}
}
编辑:
提示:尽量只在必要时使用Pin
。在您的情况下,您实际上并不需要 poll_data
函数中的固定指针。 &mut self
就好了,它减少了 Pin
的使用:
impl MyFuture {
fn poll_data(&mut self, cx: &mut Context) -> Poll<()> {
use Poll::*;
loop {
match Pin::new(&mut self.data).poll_next(cx) {
Ready(Some(vec)) => self.state.update(vec),
Ready(None) => return Ready(()),
Pending => return Pending,
}
}
}
}
和未来实现:
impl Future for MyFuture {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
use Poll::*;
if let Ready(_) = self.poll_data(cx) {
return Ready(());
}
if let Ready(_) = self.poll_events(cx) {
return Ready(());
}
return Pending;
}
}
我正在迁移到 futures
0.3 和 tokio
0.2,并且有一个重复出现的模式我无法重新使用。我不确定这种模式是否已经过时,或者我是否对 Pin
.
通常我有一种类型,它包含一个套接字和几个通道接收器。此类结构的 Future
实现包括重复轮询流,直到它们 return Pending
(NotReady
在 0.1 生态系统中)。
然而,在 futures 0.3 中,Future::poll
和 Stream::poll_next
使用 self
而不是 &mut self
,并且这种模式不再有效:
use futures::{
stream::Stream,
task::{Context, Poll},
Future,
};
use std::pin::Pin;
use tokio::sync::mpsc::{Receiver, Sender};
/// Dummy structure that represent some state we update when we
/// receive data or events.
struct State;
impl State {
fn update(&mut self, _data: Vec<u8>) {
println!("updated state");
}
fn handle_event(&mut self, _event: u32) {
println!("handled event");
}
}
/// The future I want to implement.
struct MyFuture {
state: State,
data: Receiver<Vec<u8>>,
events: Receiver<Vec<u8>>,
}
impl MyFuture {
fn poll_data(self: Pin<&mut Self>, cx: &mut Context) -> Poll<()> {
use Poll::*;
let MyFuture {
ref mut data,
ref mut state,
..
} = self.get_mut();
loop {
// this breaks, because Pin::new consume the mutable
// reference on the first iteration of the loop.
match Pin::new(data).poll_next(cx) {
Ready(Some(vec)) => state.update(vec),
Ready(None) => return Ready(()),
Pending => return Pending,
}
}
}
// unimplemented, but we basically have the same problem than with
// `poll_data()`
fn poll_events(self: Pin<&mut Self>, cx: &mut Context) -> Poll<()> {
unimplemented!()
}
}
impl Future for MyFuture {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
use Poll::*;
if let Ready(_) = self.poll_data(cx) {
return Ready(());
}
// This does not work because self was consumed when
// self.poll_data() was called.
if let Ready(_) = self.poll_events(cx) {
return Ready(());
}
return Pending;
}
}
有没有办法修复该代码?如果不是,我可以使用什么模式来实现相同的逻辑?
您可以使用 Pin::as_mut
来避免消耗 Pin
。
impl MyFuture {
fn poll_data(self: Pin<&mut Self>, cx: &mut Context) -> Poll<()> {
use Poll::*;
let MyFuture {
ref mut data,
ref mut state,
..
} = self.get_mut();
let mut data = Pin::new(data); // Move pin here
loop {
match data.as_mut().poll_next(cx) { // Use in loop by calling `as_mut()`
Ready(Some(vec)) => state.update(vec),
Ready(None) => return Ready(()),
Pending => return Pending,
}
}
}
}
并在未来实现:
impl Future for MyFuture {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
use Poll::*;
// `as_mut()` here to avoid consuming
if let Ready(_) = self.as_mut().poll_data(cx) {
return Ready(());
}
// can consume here as this is the last invocation
if let Ready(_) = self.poll_events(cx) {
return Ready(());
}
return Pending;
}
}
编辑:
提示:尽量只在必要时使用Pin
。在您的情况下,您实际上并不需要 poll_data
函数中的固定指针。 &mut self
就好了,它减少了 Pin
的使用:
impl MyFuture {
fn poll_data(&mut self, cx: &mut Context) -> Poll<()> {
use Poll::*;
loop {
match Pin::new(&mut self.data).poll_next(cx) {
Ready(Some(vec)) => self.state.update(vec),
Ready(None) => return Ready(()),
Pending => return Pending,
}
}
}
}
和未来实现:
impl Future for MyFuture {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
use Poll::*;
if let Ready(_) = self.poll_data(cx) {
return Ready(());
}
if let Ready(_) = self.poll_events(cx) {
return Ready(());
}
return Pending;
}
}