如何 select 在 Rust 中的未来和流之间?
How to select between a future and stream in Rust?
我刚刚开始在 Rust 中试验 futures/tokio。我可以用 futures 或 streams 做一些非常基本的事情。我想知道如何 select 在未来和流之间。
如何扩展 tokio 文档中的玩具问题以使用 tokio_timer::Timer
执行定时 HTTPS 请求?
extern crate futures; // v0.1 (old)
extern crate native_tls;
extern crate tokio_core;
extern crate tokio_io;
extern crate tokio_tls;
use std::io;
use std::net::ToSocketAddrs;
use futures::Future;
use native_tls::TlsConnector;
use tokio_core::net::TcpStream;
use tokio_core::reactor::Core;
use tokio_tls::TlsConnectorExt;
fn main() {
let mut core = Core::new().unwrap();
let handle = core.handle();
let addr = "www.rust-lang.org:443".to_socket_addrs().unwrap().next().unwrap();
let cx = TlsConnector::builder().unwrap().build().unwrap();
let socket = TcpStream::connect(&addr, &handle);
let tls_handshake = socket.and_then(|socket| {
let tls = cx.connect_async("www.rust-lang.org", socket);
tls.map_err(|e| {
io::Error::new(io::ErrorKind::Other, e)
})
});
let request = tls_handshake.and_then(|socket| {
tokio_io::io::write_all(socket, "\
GET / HTTP/1.0\r\n\
Host: www.rust-lang.org\r\n\
\r\n\
".as_bytes())
});
let response = request.and_then(|(socket, _request)| {
tokio_io::io::read_to_end(socket, Vec::new())
});
let (_socket, data) = core.run(response).unwrap();
println!("{}", String::from_utf8_lossy(&data));
}
您可以使用 FutureExt::into_stream
将 Future
转换为 Stream
,然后在两个流上使用 select:
use futures::prelude::*; // 0.3.1
fn select_stream_or_future_as_stream<S, F>(stream: S, future: F) -> impl Stream<Item = S::Item>
where
S: Stream,
F: Future<Output = S::Item>,
{
stream::select(future.into_stream(), stream)
}
另请参阅:
下面是我改编的示例代码,可能对初学者有用。
let timer = tokio_timer::Timer::default();
// Error out when timeout is reached
let timeout = timer.sleep(time::Duration::from_millis(950)).then(|_| {
future::err(io::Error::new(io::ErrorKind::Other, "Timeout"))
});
let handle = core.handle();
// this returns IoFuture = BoxFuture<T, io::Error>;
let addresses = tokio_dns::CpuPoolResolver::new(1 as usize).resolve("www.google.cz");
let socket = addresses.and_then(|all_addresses| {
let mut ipv4_addresses = all_addresses.iter().filter(|x| is_ipv4(**x));
let addr = ipv4_addresses.next().unwrap();
let sock = TcpStream::connect(&SocketAddr::new(*addr, 443), &handle);
sock.map_err(|e| {
println!("{:?}", e);
io::Error::new(io::ErrorKind::Other, e)
})
});
let tls_handshake = socket.and_then(|socket| {
println!("Got socket");
let cx = TlsConnector::builder().unwrap().build().unwrap();
let tls = cx.connect_async("www.google.cz", socket);
tls.map_err(|e| {
println!("{:?}", e);
io::Error::new(io::ErrorKind::Other, e)
})
});
let request = tls_handshake.and_then(|socket| {
println!("SSL Handshake Successful");
let write_all = tokio_io::io::write_all(socket, "\
GET / HTTP/1.0\r\n\
Host: www.google.cz\r\n\
\r\n\
".as_bytes());
println!("Wrote to socket");
write_all.map_err(|e| {
println!("{:?}", e);
io::Error::new(io::ErrorKind::Other, e)
})
});
let response = request.and_then(|(socket, _request)| {
let read_till_end = tokio_io::io::read_to_end(socket, Vec::new());
println!("Read till end of socket");
read_till_end
});
let waiter = response.select(timeout).map(|(win, _)| {
let (_socket, data) = win;
data
});
let result = core.run(waiter);
我刚刚开始在 Rust 中试验 futures/tokio。我可以用 futures 或 streams 做一些非常基本的事情。我想知道如何 select 在未来和流之间。
如何扩展 tokio 文档中的玩具问题以使用 tokio_timer::Timer
执行定时 HTTPS 请求?
extern crate futures; // v0.1 (old)
extern crate native_tls;
extern crate tokio_core;
extern crate tokio_io;
extern crate tokio_tls;
use std::io;
use std::net::ToSocketAddrs;
use futures::Future;
use native_tls::TlsConnector;
use tokio_core::net::TcpStream;
use tokio_core::reactor::Core;
use tokio_tls::TlsConnectorExt;
fn main() {
let mut core = Core::new().unwrap();
let handle = core.handle();
let addr = "www.rust-lang.org:443".to_socket_addrs().unwrap().next().unwrap();
let cx = TlsConnector::builder().unwrap().build().unwrap();
let socket = TcpStream::connect(&addr, &handle);
let tls_handshake = socket.and_then(|socket| {
let tls = cx.connect_async("www.rust-lang.org", socket);
tls.map_err(|e| {
io::Error::new(io::ErrorKind::Other, e)
})
});
let request = tls_handshake.and_then(|socket| {
tokio_io::io::write_all(socket, "\
GET / HTTP/1.0\r\n\
Host: www.rust-lang.org\r\n\
\r\n\
".as_bytes())
});
let response = request.and_then(|(socket, _request)| {
tokio_io::io::read_to_end(socket, Vec::new())
});
let (_socket, data) = core.run(response).unwrap();
println!("{}", String::from_utf8_lossy(&data));
}
您可以使用 FutureExt::into_stream
将 Future
转换为 Stream
,然后在两个流上使用 select:
use futures::prelude::*; // 0.3.1
fn select_stream_or_future_as_stream<S, F>(stream: S, future: F) -> impl Stream<Item = S::Item>
where
S: Stream,
F: Future<Output = S::Item>,
{
stream::select(future.into_stream(), stream)
}
另请参阅:
下面是我改编的示例代码,可能对初学者有用。
let timer = tokio_timer::Timer::default();
// Error out when timeout is reached
let timeout = timer.sleep(time::Duration::from_millis(950)).then(|_| {
future::err(io::Error::new(io::ErrorKind::Other, "Timeout"))
});
let handle = core.handle();
// this returns IoFuture = BoxFuture<T, io::Error>;
let addresses = tokio_dns::CpuPoolResolver::new(1 as usize).resolve("www.google.cz");
let socket = addresses.and_then(|all_addresses| {
let mut ipv4_addresses = all_addresses.iter().filter(|x| is_ipv4(**x));
let addr = ipv4_addresses.next().unwrap();
let sock = TcpStream::connect(&SocketAddr::new(*addr, 443), &handle);
sock.map_err(|e| {
println!("{:?}", e);
io::Error::new(io::ErrorKind::Other, e)
})
});
let tls_handshake = socket.and_then(|socket| {
println!("Got socket");
let cx = TlsConnector::builder().unwrap().build().unwrap();
let tls = cx.connect_async("www.google.cz", socket);
tls.map_err(|e| {
println!("{:?}", e);
io::Error::new(io::ErrorKind::Other, e)
})
});
let request = tls_handshake.and_then(|socket| {
println!("SSL Handshake Successful");
let write_all = tokio_io::io::write_all(socket, "\
GET / HTTP/1.0\r\n\
Host: www.google.cz\r\n\
\r\n\
".as_bytes());
println!("Wrote to socket");
write_all.map_err(|e| {
println!("{:?}", e);
io::Error::new(io::ErrorKind::Other, e)
})
});
let response = request.and_then(|(socket, _request)| {
let read_till_end = tokio_io::io::read_to_end(socket, Vec::new());
println!("Read till end of socket");
read_till_end
});
let waiter = response.select(timeout).map(|(win, _)| {
let (_socket, data) = win;
data
});
let result = core.run(waiter);