我如何测试绑定到 tokio TcpStream 的未来?
How can I test a future that is bound to a tokio TcpStream?
我有一个使用 LinesCodec
将 TCP 流包装在 Framed
中的未来。
当我尝试在测试中包装它时,大约有 20% 的时间我会阻塞未来,但是因为我没有在我尝试连接的套接字上侦听任何东西,所以我希望总能得到错误:
thread 'tokio-runtime-worker-0' panicked at 'error: Os { code: 111, kind: ConnectionRefused, message: "Connection refused" }', src/lib.rs:35:24 note: Run with 'RUST_BACKTRACE=1' for a backtrace.
这是我用过的测试代码:
#[macro_use(try_ready)]
extern crate futures; // 0.1.24
extern crate tokio; // 0.1.8
use std::io;
use std::net::SocketAddr;
use tokio::codec::{Framed, LinesCodec};
use tokio::net::TcpStream;
use tokio::prelude::*;
struct MyFuture {
addr: SocketAddr,
}
impl Future for MyFuture {
type Item = Framed<TcpStream, LinesCodec>;
type Error = io::Error;
fn poll(&mut self) -> Result<Async<Framed<TcpStream, LinesCodec>>, io::Error> {
let strm = try_ready!(TcpStream::connect(&self.addr).poll());
Ok(Async::Ready(Framed::new(strm, LinesCodec::new())))
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::net::Shutdown;
#[test]
fn connect() {
let addr: SocketAddr = "127.0.0.1:4222".parse().unwrap();
let fut = MyFuture { addr: addr }
.and_then(|f| {
println!("connected");
let cn = f.get_ref();
cn.shutdown(Shutdown::Both)
}).map_err(|e| panic!("error: {:?}", e));
tokio::run(fut)
}
}
我在其他语言中看到过模式,其中测试二进制文件本身提供了一种机制来异步 return 结果,但还没有找到在 Rust 中使用类似机制的好方法。
问题不在于测试,而在于实施。
这个基于您的工作测试用例没有自定义未来实现,仅调用 TcpStream::connect()
。它如您所愿地工作。
extern crate futures;
extern crate tokio;
#[cfg(test)]
mod tests {
use super::*;
use std::net::Shutdown;
use std::net::SocketAddr;
use tokio::net::TcpStream;
use tokio::prelude::*;
#[test]
fn connect() {
let addr: SocketAddr = "127.0.0.1:4222".parse().unwrap();
let fut = TcpStream::connect(&addr)
.and_then(|f| {
println!("connected");
f.shutdown(Shutdown::Both)
}).map_err(|e| panic!("error: {:?}", e));
tokio::run(fut)
}
}
您在 poll()
方法中一遍又一遍地连接到同一个端点。未来不是这样运作的。 poll()
方法将被重复调用,期望在某个时候它会 return Ok(Async::Ready(..))
或 Err(..)
。
如果每次调用 poll()
都启动一个新的 TCP 连接,则不太可能及时完成。
Here is a modified example that does what you expect:
#[macro_use(try_ready)]
extern crate futures;
extern crate tokio;
use std::io;
use std::net::SocketAddr;
use tokio::codec::{Framed, LinesCodec};
use tokio::net::{ConnectFuture, TcpStream};
use tokio::prelude::*;
struct MyFuture {
tcp: ConnectFuture,
}
impl MyFuture {
fn new(addr: SocketAddr) -> MyFuture {
MyFuture {
tcp: TcpStream::connect(&addr),
}
}
}
impl Future for MyFuture {
type Item = Framed<TcpStream, LinesCodec>;
type Error = io::Error;
fn poll(&mut self) -> Result<Async<Framed<TcpStream, LinesCodec>>, io::Error> {
let strm = try_ready!(self.tcp.poll());
Ok(Async::Ready(Framed::new(strm, LinesCodec::new())))
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::net::Shutdown;
#[test]
fn connect() {
let addr: SocketAddr = "127.0.0.1:4222".parse().unwrap();
let fut = MyFuture::new(addr)
.and_then(|f| {
println!("connected");
let cn = f.get_ref();
cn.shutdown(Shutdown::Both)
}).map_err(|e| panic!("error: {:?}", e));
tokio::run(fut)
}
}
虽然我不确定你未来打算做什么;我无法评论这是否是正确的方法。
测试异步代码的一种简单方法可能是为每个测试使用专用运行时:启动它,等待将来完成并在测试结束时关闭运行时。
#[test]
fn my_case() {
// setup future f
// ...
tokio::run(f);
}
我不知道 Rust 生态系统中是否已经有统一的模式;请参阅 this discussion 关于对基于未来的代码的测试支持的演变。
为什么您的代码无法按预期工作
当您调用 poll()
时,将查询未来以检查值是否可用。
如果某个值不可用,则会注册一个兴趣,以便在发生可以解决未来问题的事情时再次调用 poll()
。
当您的 MyFuture::poll()
被调用时:
TcpStream::connect
创造新的未来TcpStreamNew
TcpStreamNew::poll
在第 1 步创建未来时仅 一次 立即调用。
- future 超出范围,所以下次调用
MyFuture::poll
时,您永远不会解析之前创建的 futures。
您已经注册了对未来的兴趣,如果您第一次投票时未解决,您将永远不会再次询问(投票)已解决的值或错误。
"nondeterministic" 行为的原因是因为第一个 poll
有时会立即解决并出现 ConnectionRefused
错误,有时它会永远等待未来的连接事件或失败永远不会被检索到。
看看Tokio使用的mio::sys::unix::tcp::TcpStream
:
impl TcpStream {
pub fn connect(stream: net::TcpStream, addr: &SocketAddr) -> io::Result<TcpStream> {
set_nonblock(stream.as_raw_fd())?;
match stream.connect(addr) {
Ok(..) => {}
Err(ref e) if e.raw_os_error() == Some(libc::EINPROGRESS) => {}
Err(e) => return Err(e),
}
Ok(TcpStream {
inner: stream,
})
}
当你在非阻塞套接字上connect
时,系统调用可能会立即connect/fail或return EINPROGRESS
,在最后一种情况下必须触发轮询用于检索错误的值。
在某种程度上,您可以加入 tokio 的测试库以简化此过程;它在单元测试中支持 async/await。
#[tokio::test]
async fn my_future_test() {
let addr: SocketAddr = "127.0.0.1:4222".parse().unwrap();
match MyFuture { addr }.poll().await {
Ok(f) => assert!("something good")
Err(e) => assert!("something bad")
}
}
我有一个使用 LinesCodec
将 TCP 流包装在 Framed
中的未来。
当我尝试在测试中包装它时,大约有 20% 的时间我会阻塞未来,但是因为我没有在我尝试连接的套接字上侦听任何东西,所以我希望总能得到错误:
thread 'tokio-runtime-worker-0' panicked at 'error: Os { code: 111, kind: ConnectionRefused, message: "Connection refused" }', src/lib.rs:35:24 note: Run with 'RUST_BACKTRACE=1' for a backtrace.
这是我用过的测试代码:
#[macro_use(try_ready)]
extern crate futures; // 0.1.24
extern crate tokio; // 0.1.8
use std::io;
use std::net::SocketAddr;
use tokio::codec::{Framed, LinesCodec};
use tokio::net::TcpStream;
use tokio::prelude::*;
struct MyFuture {
addr: SocketAddr,
}
impl Future for MyFuture {
type Item = Framed<TcpStream, LinesCodec>;
type Error = io::Error;
fn poll(&mut self) -> Result<Async<Framed<TcpStream, LinesCodec>>, io::Error> {
let strm = try_ready!(TcpStream::connect(&self.addr).poll());
Ok(Async::Ready(Framed::new(strm, LinesCodec::new())))
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::net::Shutdown;
#[test]
fn connect() {
let addr: SocketAddr = "127.0.0.1:4222".parse().unwrap();
let fut = MyFuture { addr: addr }
.and_then(|f| {
println!("connected");
let cn = f.get_ref();
cn.shutdown(Shutdown::Both)
}).map_err(|e| panic!("error: {:?}", e));
tokio::run(fut)
}
}
我在其他语言中看到过模式,其中测试二进制文件本身提供了一种机制来异步 return 结果,但还没有找到在 Rust 中使用类似机制的好方法。
问题不在于测试,而在于实施。
这个基于您的工作测试用例没有自定义未来实现,仅调用 TcpStream::connect()
。它如您所愿地工作。
extern crate futures;
extern crate tokio;
#[cfg(test)]
mod tests {
use super::*;
use std::net::Shutdown;
use std::net::SocketAddr;
use tokio::net::TcpStream;
use tokio::prelude::*;
#[test]
fn connect() {
let addr: SocketAddr = "127.0.0.1:4222".parse().unwrap();
let fut = TcpStream::connect(&addr)
.and_then(|f| {
println!("connected");
f.shutdown(Shutdown::Both)
}).map_err(|e| panic!("error: {:?}", e));
tokio::run(fut)
}
}
您在 poll()
方法中一遍又一遍地连接到同一个端点。未来不是这样运作的。 poll()
方法将被重复调用,期望在某个时候它会 return Ok(Async::Ready(..))
或 Err(..)
。
如果每次调用 poll()
都启动一个新的 TCP 连接,则不太可能及时完成。
Here is a modified example that does what you expect:
#[macro_use(try_ready)]
extern crate futures;
extern crate tokio;
use std::io;
use std::net::SocketAddr;
use tokio::codec::{Framed, LinesCodec};
use tokio::net::{ConnectFuture, TcpStream};
use tokio::prelude::*;
struct MyFuture {
tcp: ConnectFuture,
}
impl MyFuture {
fn new(addr: SocketAddr) -> MyFuture {
MyFuture {
tcp: TcpStream::connect(&addr),
}
}
}
impl Future for MyFuture {
type Item = Framed<TcpStream, LinesCodec>;
type Error = io::Error;
fn poll(&mut self) -> Result<Async<Framed<TcpStream, LinesCodec>>, io::Error> {
let strm = try_ready!(self.tcp.poll());
Ok(Async::Ready(Framed::new(strm, LinesCodec::new())))
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::net::Shutdown;
#[test]
fn connect() {
let addr: SocketAddr = "127.0.0.1:4222".parse().unwrap();
let fut = MyFuture::new(addr)
.and_then(|f| {
println!("connected");
let cn = f.get_ref();
cn.shutdown(Shutdown::Both)
}).map_err(|e| panic!("error: {:?}", e));
tokio::run(fut)
}
}
虽然我不确定你未来打算做什么;我无法评论这是否是正确的方法。
测试异步代码的一种简单方法可能是为每个测试使用专用运行时:启动它,等待将来完成并在测试结束时关闭运行时。
#[test]
fn my_case() {
// setup future f
// ...
tokio::run(f);
}
我不知道 Rust 生态系统中是否已经有统一的模式;请参阅 this discussion 关于对基于未来的代码的测试支持的演变。
为什么您的代码无法按预期工作
当您调用 poll()
时,将查询未来以检查值是否可用。
如果某个值不可用,则会注册一个兴趣,以便在发生可以解决未来问题的事情时再次调用 poll()
。
当您的 MyFuture::poll()
被调用时:
TcpStream::connect
创造新的未来TcpStreamNew
TcpStreamNew::poll
在第 1 步创建未来时仅 一次 立即调用。- future 超出范围,所以下次调用
MyFuture::poll
时,您永远不会解析之前创建的 futures。
您已经注册了对未来的兴趣,如果您第一次投票时未解决,您将永远不会再次询问(投票)已解决的值或错误。
"nondeterministic" 行为的原因是因为第一个 poll
有时会立即解决并出现 ConnectionRefused
错误,有时它会永远等待未来的连接事件或失败永远不会被检索到。
看看Tokio使用的mio::sys::unix::tcp::TcpStream
:
impl TcpStream {
pub fn connect(stream: net::TcpStream, addr: &SocketAddr) -> io::Result<TcpStream> {
set_nonblock(stream.as_raw_fd())?;
match stream.connect(addr) {
Ok(..) => {}
Err(ref e) if e.raw_os_error() == Some(libc::EINPROGRESS) => {}
Err(e) => return Err(e),
}
Ok(TcpStream {
inner: stream,
})
}
当你在非阻塞套接字上connect
时,系统调用可能会立即connect/fail或return EINPROGRESS
,在最后一种情况下必须触发轮询用于检索错误的值。
在某种程度上,您可以加入 tokio 的测试库以简化此过程;它在单元测试中支持 async/await。
#[tokio::test]
async fn my_future_test() {
let addr: SocketAddr = "127.0.0.1:4222".parse().unwrap();
match MyFuture { addr }.poll().await {
Ok(f) => assert!("something good")
Err(e) => assert!("something bad")
}
}