如何使用 Tokio 实现基于拉取的系统?

How can I implement a pull-based system using Tokio?

我想在服务器和客户端之间实现一个基于拉的系统,其中服务器只会在客户端请求时推送数据。

我在玩 Tokio 并且能够创建一个基于推送的系统,我能够以 1 毫秒的间隔推送一个字符串。

let done = listener
    .incoming()
    .for_each(move |socket| {
        let server_queue = _cqueue.clone();
        let (reader, mut writer) = socket.split();
        let sender = Interval::new_interval(std::time::Duration::from_millis(1))
            .for_each(move |_| {
                writer
                    .poll_write(server_queue.pull().borrow())
                    .map_err(|_| {
                        tokio::timer::Error::shutdown();
                    })
                    .unwrap();
                return Ok(());
            })
            .map_err(|e| println!("{}", e));
        ;
        tokio::spawn(sender);
        return Ok(());
    })
    .map_err(|e| println!("Future_error {}", e));

有没有办法只在客户要求时才发送,而不必使用 reader?

让我们回想一下可能导致这种情况的事件类型 "sending of data"。你可以想出多种办法:

  • 客户端连接到服务器。根据合同,这是 "asking for data"。你已经实现了这个案例
  • 客户端在连接客户端和服务器的 socket/pipe 上发送带内消息。为此,您需要使用 socketAsyncRead 部分,以及您已经使用过的 AsyncWrite 部分,并构建一个双工通道,以便您可以同时阅读和交谈
  • 客户端发送带外消息,通常在另一个原型主机端口三元组上并使用不同的协议。您当前的服务器识别它,并向客户端发送该数据。为此,您需要一个 reader 用于另一个三元组,并且您需要一个消息传递结构来将其中继到可以访问套接字 AsyncWrite 部分的一个地方

简短的回答是否定的,您不能真正对您没有监听的事件采取行动。


@Shepmaster I was just wondering if there was an existing library that can be used to handle this "neatly"

有,然后没有。

大多数图书馆都以特定问题为中心。在您的情况下,您选择了通过使用 TCP 套接字(实现 AsyncRead + AsyncWrite)在尽可​​能低的级别上工作。

做任何事情,你需要决定:

  1. 一种传输格式
  2. 协议

当我需要快速而肮脏的双工流实现时,我倾向于将代码包装到其中:

use futures::sync::mpsc::{UnboundedSender, unbounded};
use std::sync::{Arc};
use futures::{Sink, Stream, Future, future, stream};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::codec::{Framed, Encoder, Decoder};
use std::io;
use std::fmt::Debug;
use futures_locks::{RwLock as FutLock};

enum Message<T:Send+Debug+'static> {
    Content(T),
    Done
}

impl<T: Send + Debug + 'static> From<T> for Message<T> {
    fn from(message:T) -> Message<T> {
        Message::Content(message)
    }
}

struct DuplexStream<T:Send+Debug+'static> {
    writer: Arc<FutLock<UnboundedSender<Message<T>>>>,
    handlers: Arc<FutLock<Option<Box<dyn Stream<Item = Message<T>, Error = ()> + Send>>>>
}

impl<T:Send+Debug+'static> DuplexStream<T> {

    pub fn from<R,U>(framed_socket: Framed<R, U>) -> Arc<DuplexStream<T>>
        where U: Send + Encoder<Item = T> + Decoder<Item = T> + 'static, R: Send + AsyncRead + AsyncWrite + 'static {

        let (tx, rx) = framed_socket.split();

        // Assemble the combined upstream stream
        let (upstream_tx, upstream_rx) = unbounded();
        let upstream = upstream_rx.take_while(|item| match item {
            Message::Done => future::ok(false),
            _ => future::ok(true)
        }).fold(tx, |o, m| {
            o.send(match m {
                Message::Content(i) => i,
                _ => unreachable!()
            }).map_err(|_| {
                ()
            })
        }).map(|e| {
            Message::Done
        }).into_stream();

        // Assemble the downstream stream
        let downstream = rx.map_err(|_| ()).map(|r| {
            Message::Content(r)
        }).chain(stream::once(Ok(Message::Done)));

        Arc::new(DuplexStream {
            writer: Arc::new(FutLock::new(upstream_tx)),
            handlers: Arc::new(FutLock::new(Some(Box::new(upstream.select(downstream).take_while(|m| match m {
                Message::Content(_) => {
                    future::ok(true)
                },
                Message::Done => {
                    future::ok(false)
                }
            })))))
        })
    }

    pub fn start(self: Arc<Self>) -> Box<dyn Stream<Item = T, Error = io::Error> + Send> {
        Box::new(self.handlers
            .write()
            .map_err(|_| io::Error::new(io::ErrorKind::NotFound, "Stream closed"))
            .map(|mut handler| -> Box<dyn Stream<Item = T, Error = io::Error> + Send> {
                match handler.take() {
                    Some(e) => Box::new(e.map(|r| match r {
                        Message::Content(i) => i,
                        _ => unreachable!()
                    }).map_err(|_| io::Error::new(io::ErrorKind::NotFound, "Stream closed"))),
                    None => Box::new(stream::once(Err(io::Error::new(io::ErrorKind::AddrInUse, "Handler already taken"))))
                }
            }).into_stream().flatten()
        )
    }

    pub fn close(self: Arc<Self>) -> Box<dyn Future<Item = (), Error = io::Error> + Send> {
        self.inner_send(Message::Done)
    }
    pub fn send(self: Arc<Self>, message: T) -> Box<dyn Future<Item = (), Error = io::Error> + Send> {
        self.inner_send(message.into())
    }
    pub fn inner_send(self: Arc<Self>, message: Message<T>) -> Box<dyn Future<Item = (), Error = io::Error> + Send> {
        Box::new(self.writer.write()
            .map_err(|_| io::Error::new(io::ErrorKind::NotFound, "The mutex has disappeared")).and_then(|guard| {
                future::result(guard.unbounded_send(message).map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "The sink has gone away")))
        }))
    }
}

这个结构有很多优点,但也有一些缺点。主要优点是您可以像处理另一种语言一样处理同一对象 上的读写部分。对象本身实现了 Clone(因为它是一个 Arc),每个方法都可以在任何地方使用(对旧的 futures 代码特别有用)并且只要你保留它的副本 某处并且不调用close()它会保持运行(只要底层AsyncRead + AsyncWrite实现仍然存在)。

这并不能免除您第 1 点和第 2 点的责任,但您可以(并且应该)利用 tokio::codec::Framed 实现第 1 点,并将第 2 点实现为业务逻辑。

用法示例(实际上是测试 ;-) ):

#[test]
fn it_writes() {
    let stream = DuplexStream::from(make_w());
    let stream_write = Arc::clone(&stream);
    let stream_read=  Arc::clone(&stream);
    let dup = Arc::clone(&stream);
    tokio::run(lazy(move || {
        let stream_write = Arc::clone(&stream_write);
        stream_read.start().and_then(move |i| {
            let stream_write = Arc::clone(&stream_write);
            stream_write.send("foo".to_string()).map(|_| i)
        }).collect().map(|r| {
            assert_eq!(r, vec!["foo".to_string(), "bar".to_string(), "bazfoo".to_string(), "foo".to_string()])
        }).map_err(|_| {
            assert_eq!(true, false);
        })
    }));
}