如何使用 Tokio 实现基于拉取的系统?
How can I implement a pull-based system using Tokio?
我想在服务器和客户端之间实现一个基于拉的系统,其中服务器只会在客户端请求时推送数据。
我在玩 Tokio 并且能够创建一个基于推送的系统,我能够以 1 毫秒的间隔推送一个字符串。
let done = listener
.incoming()
.for_each(move |socket| {
let server_queue = _cqueue.clone();
let (reader, mut writer) = socket.split();
let sender = Interval::new_interval(std::time::Duration::from_millis(1))
.for_each(move |_| {
writer
.poll_write(server_queue.pull().borrow())
.map_err(|_| {
tokio::timer::Error::shutdown();
})
.unwrap();
return Ok(());
})
.map_err(|e| println!("{}", e));
;
tokio::spawn(sender);
return Ok(());
})
.map_err(|e| println!("Future_error {}", e));
有没有办法只在客户要求时才发送,而不必使用 reader?
让我们回想一下可能导致这种情况的事件类型 "sending of data"。你可以想出多种办法:
- 客户端连接到服务器。根据合同,这是 "asking for data"。你已经实现了这个案例
- 客户端在连接客户端和服务器的 socket/pipe 上发送带内消息。为此,您需要使用
socket
的 AsyncRead
部分,以及您已经使用过的 AsyncWrite
部分,并构建一个双工通道,以便您可以同时阅读和交谈
- 客户端发送带外消息,通常在另一个原型主机端口三元组上并使用不同的协议。您当前的服务器识别它,并向客户端发送该数据。为此,您需要一个 reader 用于另一个三元组,并且您需要一个消息传递结构来将其中继到可以访问套接字
AsyncWrite
部分的一个地方
简短的回答是否定的,您不能真正对您没有监听的事件采取行动。
@Shepmaster I was just wondering if there was an existing library that can be used to handle this "neatly"
有,然后没有。
大多数图书馆都以特定问题为中心。在您的情况下,您选择了通过使用 TCP 套接字(实现 AsyncRead + AsyncWrite
)在尽可能低的级别上工作。
做任何事情,你需要决定:
- 一种传输格式
- 协议
当我需要快速而肮脏的双工流实现时,我倾向于将代码包装到其中:
use futures::sync::mpsc::{UnboundedSender, unbounded};
use std::sync::{Arc};
use futures::{Sink, Stream, Future, future, stream};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::codec::{Framed, Encoder, Decoder};
use std::io;
use std::fmt::Debug;
use futures_locks::{RwLock as FutLock};
enum Message<T:Send+Debug+'static> {
Content(T),
Done
}
impl<T: Send + Debug + 'static> From<T> for Message<T> {
fn from(message:T) -> Message<T> {
Message::Content(message)
}
}
struct DuplexStream<T:Send+Debug+'static> {
writer: Arc<FutLock<UnboundedSender<Message<T>>>>,
handlers: Arc<FutLock<Option<Box<dyn Stream<Item = Message<T>, Error = ()> + Send>>>>
}
impl<T:Send+Debug+'static> DuplexStream<T> {
pub fn from<R,U>(framed_socket: Framed<R, U>) -> Arc<DuplexStream<T>>
where U: Send + Encoder<Item = T> + Decoder<Item = T> + 'static, R: Send + AsyncRead + AsyncWrite + 'static {
let (tx, rx) = framed_socket.split();
// Assemble the combined upstream stream
let (upstream_tx, upstream_rx) = unbounded();
let upstream = upstream_rx.take_while(|item| match item {
Message::Done => future::ok(false),
_ => future::ok(true)
}).fold(tx, |o, m| {
o.send(match m {
Message::Content(i) => i,
_ => unreachable!()
}).map_err(|_| {
()
})
}).map(|e| {
Message::Done
}).into_stream();
// Assemble the downstream stream
let downstream = rx.map_err(|_| ()).map(|r| {
Message::Content(r)
}).chain(stream::once(Ok(Message::Done)));
Arc::new(DuplexStream {
writer: Arc::new(FutLock::new(upstream_tx)),
handlers: Arc::new(FutLock::new(Some(Box::new(upstream.select(downstream).take_while(|m| match m {
Message::Content(_) => {
future::ok(true)
},
Message::Done => {
future::ok(false)
}
})))))
})
}
pub fn start(self: Arc<Self>) -> Box<dyn Stream<Item = T, Error = io::Error> + Send> {
Box::new(self.handlers
.write()
.map_err(|_| io::Error::new(io::ErrorKind::NotFound, "Stream closed"))
.map(|mut handler| -> Box<dyn Stream<Item = T, Error = io::Error> + Send> {
match handler.take() {
Some(e) => Box::new(e.map(|r| match r {
Message::Content(i) => i,
_ => unreachable!()
}).map_err(|_| io::Error::new(io::ErrorKind::NotFound, "Stream closed"))),
None => Box::new(stream::once(Err(io::Error::new(io::ErrorKind::AddrInUse, "Handler already taken"))))
}
}).into_stream().flatten()
)
}
pub fn close(self: Arc<Self>) -> Box<dyn Future<Item = (), Error = io::Error> + Send> {
self.inner_send(Message::Done)
}
pub fn send(self: Arc<Self>, message: T) -> Box<dyn Future<Item = (), Error = io::Error> + Send> {
self.inner_send(message.into())
}
pub fn inner_send(self: Arc<Self>, message: Message<T>) -> Box<dyn Future<Item = (), Error = io::Error> + Send> {
Box::new(self.writer.write()
.map_err(|_| io::Error::new(io::ErrorKind::NotFound, "The mutex has disappeared")).and_then(|guard| {
future::result(guard.unbounded_send(message).map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "The sink has gone away")))
}))
}
}
这个结构有很多优点,但也有一些缺点。主要优点是您可以像处理另一种语言一样处理同一对象 上的读写部分。对象本身实现了 Clone
(因为它是一个 Arc
),每个方法都可以在任何地方使用(对旧的 futures
代码特别有用)并且只要你保留它的副本 某处并且不调用close()
它会保持运行(只要底层AsyncRead + AsyncWrite
实现仍然存在)。
这并不能免除您第 1 点和第 2 点的责任,但您可以(并且应该)利用 tokio::codec::Framed
实现第 1 点,并将第 2 点实现为业务逻辑。
用法示例(实际上是测试 ;-) ):
#[test]
fn it_writes() {
let stream = DuplexStream::from(make_w());
let stream_write = Arc::clone(&stream);
let stream_read= Arc::clone(&stream);
let dup = Arc::clone(&stream);
tokio::run(lazy(move || {
let stream_write = Arc::clone(&stream_write);
stream_read.start().and_then(move |i| {
let stream_write = Arc::clone(&stream_write);
stream_write.send("foo".to_string()).map(|_| i)
}).collect().map(|r| {
assert_eq!(r, vec!["foo".to_string(), "bar".to_string(), "bazfoo".to_string(), "foo".to_string()])
}).map_err(|_| {
assert_eq!(true, false);
})
}));
}
我想在服务器和客户端之间实现一个基于拉的系统,其中服务器只会在客户端请求时推送数据。
我在玩 Tokio 并且能够创建一个基于推送的系统,我能够以 1 毫秒的间隔推送一个字符串。
let done = listener
.incoming()
.for_each(move |socket| {
let server_queue = _cqueue.clone();
let (reader, mut writer) = socket.split();
let sender = Interval::new_interval(std::time::Duration::from_millis(1))
.for_each(move |_| {
writer
.poll_write(server_queue.pull().borrow())
.map_err(|_| {
tokio::timer::Error::shutdown();
})
.unwrap();
return Ok(());
})
.map_err(|e| println!("{}", e));
;
tokio::spawn(sender);
return Ok(());
})
.map_err(|e| println!("Future_error {}", e));
有没有办法只在客户要求时才发送,而不必使用 reader?
让我们回想一下可能导致这种情况的事件类型 "sending of data"。你可以想出多种办法:
- 客户端连接到服务器。根据合同,这是 "asking for data"。你已经实现了这个案例
- 客户端在连接客户端和服务器的 socket/pipe 上发送带内消息。为此,您需要使用
socket
的AsyncRead
部分,以及您已经使用过的AsyncWrite
部分,并构建一个双工通道,以便您可以同时阅读和交谈 - 客户端发送带外消息,通常在另一个原型主机端口三元组上并使用不同的协议。您当前的服务器识别它,并向客户端发送该数据。为此,您需要一个 reader 用于另一个三元组,并且您需要一个消息传递结构来将其中继到可以访问套接字
AsyncWrite
部分的一个地方
简短的回答是否定的,您不能真正对您没有监听的事件采取行动。
@Shepmaster I was just wondering if there was an existing library that can be used to handle this "neatly"
有,然后没有。
大多数图书馆都以特定问题为中心。在您的情况下,您选择了通过使用 TCP 套接字(实现 AsyncRead + AsyncWrite
)在尽可能低的级别上工作。
做任何事情,你需要决定:
- 一种传输格式
- 协议
当我需要快速而肮脏的双工流实现时,我倾向于将代码包装到其中:
use futures::sync::mpsc::{UnboundedSender, unbounded};
use std::sync::{Arc};
use futures::{Sink, Stream, Future, future, stream};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::codec::{Framed, Encoder, Decoder};
use std::io;
use std::fmt::Debug;
use futures_locks::{RwLock as FutLock};
enum Message<T:Send+Debug+'static> {
Content(T),
Done
}
impl<T: Send + Debug + 'static> From<T> for Message<T> {
fn from(message:T) -> Message<T> {
Message::Content(message)
}
}
struct DuplexStream<T:Send+Debug+'static> {
writer: Arc<FutLock<UnboundedSender<Message<T>>>>,
handlers: Arc<FutLock<Option<Box<dyn Stream<Item = Message<T>, Error = ()> + Send>>>>
}
impl<T:Send+Debug+'static> DuplexStream<T> {
pub fn from<R,U>(framed_socket: Framed<R, U>) -> Arc<DuplexStream<T>>
where U: Send + Encoder<Item = T> + Decoder<Item = T> + 'static, R: Send + AsyncRead + AsyncWrite + 'static {
let (tx, rx) = framed_socket.split();
// Assemble the combined upstream stream
let (upstream_tx, upstream_rx) = unbounded();
let upstream = upstream_rx.take_while(|item| match item {
Message::Done => future::ok(false),
_ => future::ok(true)
}).fold(tx, |o, m| {
o.send(match m {
Message::Content(i) => i,
_ => unreachable!()
}).map_err(|_| {
()
})
}).map(|e| {
Message::Done
}).into_stream();
// Assemble the downstream stream
let downstream = rx.map_err(|_| ()).map(|r| {
Message::Content(r)
}).chain(stream::once(Ok(Message::Done)));
Arc::new(DuplexStream {
writer: Arc::new(FutLock::new(upstream_tx)),
handlers: Arc::new(FutLock::new(Some(Box::new(upstream.select(downstream).take_while(|m| match m {
Message::Content(_) => {
future::ok(true)
},
Message::Done => {
future::ok(false)
}
})))))
})
}
pub fn start(self: Arc<Self>) -> Box<dyn Stream<Item = T, Error = io::Error> + Send> {
Box::new(self.handlers
.write()
.map_err(|_| io::Error::new(io::ErrorKind::NotFound, "Stream closed"))
.map(|mut handler| -> Box<dyn Stream<Item = T, Error = io::Error> + Send> {
match handler.take() {
Some(e) => Box::new(e.map(|r| match r {
Message::Content(i) => i,
_ => unreachable!()
}).map_err(|_| io::Error::new(io::ErrorKind::NotFound, "Stream closed"))),
None => Box::new(stream::once(Err(io::Error::new(io::ErrorKind::AddrInUse, "Handler already taken"))))
}
}).into_stream().flatten()
)
}
pub fn close(self: Arc<Self>) -> Box<dyn Future<Item = (), Error = io::Error> + Send> {
self.inner_send(Message::Done)
}
pub fn send(self: Arc<Self>, message: T) -> Box<dyn Future<Item = (), Error = io::Error> + Send> {
self.inner_send(message.into())
}
pub fn inner_send(self: Arc<Self>, message: Message<T>) -> Box<dyn Future<Item = (), Error = io::Error> + Send> {
Box::new(self.writer.write()
.map_err(|_| io::Error::new(io::ErrorKind::NotFound, "The mutex has disappeared")).and_then(|guard| {
future::result(guard.unbounded_send(message).map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "The sink has gone away")))
}))
}
}
这个结构有很多优点,但也有一些缺点。主要优点是您可以像处理另一种语言一样处理同一对象 上的读写部分。对象本身实现了 Clone
(因为它是一个 Arc
),每个方法都可以在任何地方使用(对旧的 futures
代码特别有用)并且只要你保留它的副本 某处并且不调用close()
它会保持运行(只要底层AsyncRead + AsyncWrite
实现仍然存在)。
这并不能免除您第 1 点和第 2 点的责任,但您可以(并且应该)利用 tokio::codec::Framed
实现第 1 点,并将第 2 点实现为业务逻辑。
用法示例(实际上是测试 ;-) ):
#[test]
fn it_writes() {
let stream = DuplexStream::from(make_w());
let stream_write = Arc::clone(&stream);
let stream_read= Arc::clone(&stream);
let dup = Arc::clone(&stream);
tokio::run(lazy(move || {
let stream_write = Arc::clone(&stream_write);
stream_read.start().and_then(move |i| {
let stream_write = Arc::clone(&stream_write);
stream_write.send("foo".to_string()).map(|_| i)
}).collect().map(|r| {
assert_eq!(r, vec!["foo".to_string(), "bar".to_string(), "bazfoo".to_string(), "foo".to_string()])
}).map_err(|_| {
assert_eq!(true, false);
})
}));
}