如何向 tokio-io 添加特殊的 NotReady 逻辑?
How to add special NotReady logic to tokio-io?
我正在尝试制作一个 Stream
等待特定字符进入缓冲区。我知道 BufRead
上有 read_until()
但我实际上需要一个自定义解决方案,因为这是实现等待缓冲区中的特定字符串(或者,例如,发生正则表达式匹配)的垫脚石。
在我第一次遇到问题的项目中,问题是当我从内部未来获得 Ready(_)
并从我的函数获得 return NotReady
时,未来处理只是挂起。我发现我不应该那样做 per docs(最后一段)。但是,我没有得到的是该段中承诺的实际替代方案是什么。我阅读了 Tokio 网站上所有已发布的文档,目前对我来说没有意义。
下面是我当前的代码。不幸的是,我不能让它更简单和更小,因为它已经坏了。当前结果是这样的:
Err(Custom { kind: Other, error: Error(Shutdown) })
Err(Custom { kind: Other, error: Error(Shutdown) })
Err(Custom { kind: Other, error: Error(Shutdown) })
<ad infinum>
预期结果是从中得到一些 Ok(Ready(_))
,同时打印 W
和 W'
,并等待缓冲区中的特定字符。
extern crate futures;
extern crate tokio_core;
extern crate tokio_io;
extern crate tokio_io_timeout;
extern crate tokio_process;
use futures::stream::poll_fn;
use futures::{Async, Poll, Stream};
use tokio_core::reactor::Core;
use tokio_io::AsyncRead;
use tokio_io_timeout::TimeoutReader;
use tokio_process::CommandExt;
use std::process::{Command, Stdio};
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;
struct Process {
child: tokio_process::Child,
stdout: Arc<Mutex<tokio_io_timeout::TimeoutReader<tokio_process::ChildStdout>>>,
}
impl Process {
fn new(
command: &str,
reader_timeout: Option<Duration>,
core: &tokio_core::reactor::Core,
) -> Self {
let mut cmd = Command::new(command);
let cat = cmd.stdout(Stdio::piped());
let mut child = cat.spawn_async(&core.handle()).unwrap();
let stdout = child.stdout().take().unwrap();
let mut timeout_reader = TimeoutReader::new(stdout);
timeout_reader.set_timeout(reader_timeout);
let timeout_reader = Arc::new(Mutex::new(timeout_reader));
Self {
child,
stdout: timeout_reader,
}
}
}
fn work() -> Result<(), ()> {
let window = Arc::new(Mutex::new(Vec::new()));
let mut core = Core::new().unwrap();
let process = Process::new("cat", Some(Duration::from_secs(20)), &core);
let mark = Arc::new(Mutex::new(b'c'));
let read_until_stream = poll_fn({
let window = window.clone();
let timeout_reader = process.stdout.clone();
move || -> Poll<Option<u8>, std::io::Error> {
let mut buf = [0; 8];
let poll;
{
let mut timeout_reader = timeout_reader.lock().unwrap();
poll = timeout_reader.poll_read(&mut buf);
}
match poll {
Ok(Async::Ready(0)) => Ok(Async::Ready(None)),
Ok(Async::Ready(x)) => {
{
let mut window = window.lock().unwrap();
println!("W: {:?}", *window);
println!("buf: {:?}", &buf[0..x]);
window.extend(buf[0..x].into_iter().map(|x| *x));
println!("W': {:?}", *window);
if let Some(_) = window.iter().find(|c| **c == *mark.lock().unwrap()) {
Ok(Async::Ready(Some(1)))
} else {
Ok(Async::NotReady)
}
}
}
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(e) => Err(e),
}
}
});
let _stream_thread = thread::spawn(move || {
for o in read_until_stream.wait() {
println!("{:?}", o);
}
});
match core.run(process.child) {
Ok(_) => {}
Err(e) => {
println!("Child error: {:?}", e);
}
}
Ok(())
}
fn main() {
work().unwrap();
}
如果您需要更多数据,您需要再次调用 poll_read
,直到找到您要查找的内容或 poll_read
returns NotReady
.
您可能希望避免在一个任务中循环太久,因此您可以自己构建一个 yield_task
函数,以便在 poll_read
没有 return [=16] 时调用=];它确保在其他待处理任务 运行.
之后尽快再次调用您的任务
只需使用它 运行 return yield_task();
.
fn yield_inner() {
use futures::task;
task::current().notify();
}
#[inline(always)]
pub fn yield_task<T, E>() -> Poll<T, E> {
yield_inner();
Ok(Async::NotReady)
}
另见 futures-rs#354: Handle long-running, always-ready futures fairly #354。
随着新的async/awaitAPIfutures::task::current
没了;相反,你需要一个 std::task::Context
reference, which is provided as parameter to the new std::future::Future::poll
特征方法。
如果您已经手动实现了 std::future::Future
特征,您可以简单地插入:
context.waker().wake_by_ref();
return std::task::Poll::Pending;
或者自己构建一个只产生一次的 Future
实现类型:
pub struct Yield {
ready: bool,
}
impl core::future::Future for Yield {
type Output = ();
fn poll(self: core::pin::Pin<&mut Self>, cx: &mut core::task::Context<'_>) -> core::task::Poll<Self::Output> {
let this = self.get_mut();
if this.ready {
core::task::Poll::Ready(())
} else {
cx.waker().wake_by_ref();
this.ready = true; // ready next round
core::task::Poll::Pending
}
}
}
pub fn yield_task() -> Yield {
Yield { ready: false }
}
然后在 async
代码中使用它,如下所示:
yield_task().await;
我正在尝试制作一个 Stream
等待特定字符进入缓冲区。我知道 BufRead
上有 read_until()
但我实际上需要一个自定义解决方案,因为这是实现等待缓冲区中的特定字符串(或者,例如,发生正则表达式匹配)的垫脚石。
在我第一次遇到问题的项目中,问题是当我从内部未来获得 Ready(_)
并从我的函数获得 return NotReady
时,未来处理只是挂起。我发现我不应该那样做 per docs(最后一段)。但是,我没有得到的是该段中承诺的实际替代方案是什么。我阅读了 Tokio 网站上所有已发布的文档,目前对我来说没有意义。
下面是我当前的代码。不幸的是,我不能让它更简单和更小,因为它已经坏了。当前结果是这样的:
Err(Custom { kind: Other, error: Error(Shutdown) })
Err(Custom { kind: Other, error: Error(Shutdown) })
Err(Custom { kind: Other, error: Error(Shutdown) })
<ad infinum>
预期结果是从中得到一些 Ok(Ready(_))
,同时打印 W
和 W'
,并等待缓冲区中的特定字符。
extern crate futures;
extern crate tokio_core;
extern crate tokio_io;
extern crate tokio_io_timeout;
extern crate tokio_process;
use futures::stream::poll_fn;
use futures::{Async, Poll, Stream};
use tokio_core::reactor::Core;
use tokio_io::AsyncRead;
use tokio_io_timeout::TimeoutReader;
use tokio_process::CommandExt;
use std::process::{Command, Stdio};
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;
struct Process {
child: tokio_process::Child,
stdout: Arc<Mutex<tokio_io_timeout::TimeoutReader<tokio_process::ChildStdout>>>,
}
impl Process {
fn new(
command: &str,
reader_timeout: Option<Duration>,
core: &tokio_core::reactor::Core,
) -> Self {
let mut cmd = Command::new(command);
let cat = cmd.stdout(Stdio::piped());
let mut child = cat.spawn_async(&core.handle()).unwrap();
let stdout = child.stdout().take().unwrap();
let mut timeout_reader = TimeoutReader::new(stdout);
timeout_reader.set_timeout(reader_timeout);
let timeout_reader = Arc::new(Mutex::new(timeout_reader));
Self {
child,
stdout: timeout_reader,
}
}
}
fn work() -> Result<(), ()> {
let window = Arc::new(Mutex::new(Vec::new()));
let mut core = Core::new().unwrap();
let process = Process::new("cat", Some(Duration::from_secs(20)), &core);
let mark = Arc::new(Mutex::new(b'c'));
let read_until_stream = poll_fn({
let window = window.clone();
let timeout_reader = process.stdout.clone();
move || -> Poll<Option<u8>, std::io::Error> {
let mut buf = [0; 8];
let poll;
{
let mut timeout_reader = timeout_reader.lock().unwrap();
poll = timeout_reader.poll_read(&mut buf);
}
match poll {
Ok(Async::Ready(0)) => Ok(Async::Ready(None)),
Ok(Async::Ready(x)) => {
{
let mut window = window.lock().unwrap();
println!("W: {:?}", *window);
println!("buf: {:?}", &buf[0..x]);
window.extend(buf[0..x].into_iter().map(|x| *x));
println!("W': {:?}", *window);
if let Some(_) = window.iter().find(|c| **c == *mark.lock().unwrap()) {
Ok(Async::Ready(Some(1)))
} else {
Ok(Async::NotReady)
}
}
}
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(e) => Err(e),
}
}
});
let _stream_thread = thread::spawn(move || {
for o in read_until_stream.wait() {
println!("{:?}", o);
}
});
match core.run(process.child) {
Ok(_) => {}
Err(e) => {
println!("Child error: {:?}", e);
}
}
Ok(())
}
fn main() {
work().unwrap();
}
如果您需要更多数据,您需要再次调用 poll_read
,直到找到您要查找的内容或 poll_read
returns NotReady
.
您可能希望避免在一个任务中循环太久,因此您可以自己构建一个 yield_task
函数,以便在 poll_read
没有 return [=16] 时调用=];它确保在其他待处理任务 运行.
只需使用它 运行 return yield_task();
.
fn yield_inner() {
use futures::task;
task::current().notify();
}
#[inline(always)]
pub fn yield_task<T, E>() -> Poll<T, E> {
yield_inner();
Ok(Async::NotReady)
}
另见 futures-rs#354: Handle long-running, always-ready futures fairly #354。
随着新的async/awaitAPIfutures::task::current
没了;相反,你需要一个 std::task::Context
reference, which is provided as parameter to the new std::future::Future::poll
特征方法。
如果您已经手动实现了 std::future::Future
特征,您可以简单地插入:
context.waker().wake_by_ref();
return std::task::Poll::Pending;
或者自己构建一个只产生一次的 Future
实现类型:
pub struct Yield {
ready: bool,
}
impl core::future::Future for Yield {
type Output = ();
fn poll(self: core::pin::Pin<&mut Self>, cx: &mut core::task::Context<'_>) -> core::task::Poll<Self::Output> {
let this = self.get_mut();
if this.ready {
core::task::Poll::Ready(())
} else {
cx.waker().wake_by_ref();
this.ready = true; // ready next round
core::task::Poll::Pending
}
}
}
pub fn yield_task() -> Yield {
Yield { ready: false }
}
然后在 async
代码中使用它,如下所示:
yield_task().await;