如何异步检索数据并使用基于 Tokio 的回显服务器对其进行修改?
How can I asynchronously retrieve data and modify it with a Tokio-based echo server?
我正在开发一个回声服务器,它从 TCP 获取数据并对这些数据应用一些逻辑。例如,如果客户端数据以 hello
形式出现,我想将其响应为 hello from server
.
我可以使用 copy
函数转发输入数据,但这对我来说没有用。
这是我正在处理的起始代码:
extern crate futures;
extern crate tokio_core;
extern crate tokio_io;
use futures::stream::Stream;
use futures::Future;
use std::net::SocketAddr;
use tokio_core::net::TcpListener;
use tokio_core::reactor::Core;
use tokio_io::io::copy;
use tokio_io::AsyncRead;
fn main() {
let addr = "127.0.0.1:15000".parse::<SocketAddr>().unwrap();
let mut core = Core::new().unwrap();
let handle = core.handle();
let socket = TcpListener::bind(&addr, &handle).unwrap();
println!("Listening on: {}", addr);
let done = socket.incoming().for_each(move |(socket, addr)| {
let (reader, writer) = socket.split();
let amt = copy(reader, writer);
let msg = amt.then(move |result| {
match result {
Ok((amt, _, _)) => println!("wrote {} bytes to {}", amt, addr),
Err(e) => println!("error on {}: {}", addr, e),
}
Ok(())
});
handle.spawn(msg);
Ok(())
});
core.run(done).unwrap();
}
我知道我需要添加一些逻辑而不是这个复制函数,但是怎么做呢?
let amt = copy(reader, writer);
从某种意义上说,回显服务器有点特殊,来自客户端的 "request" 之后恰好是来自服务器的一个响应。这种用例的一个很好的例子是 tokio 的 TinyDB example.
然而,应该考虑的一件事是,虽然 UDP 是基于数据包的,但它以您发送它们时所用的确切形式到达另一端,而 TCP 则不是。 TCP 是一种流协议 - 它有很强的保证,即另一方接收到数据包,并且发送的数据完全按照发送的顺序接收。但是,不能保证的是,调用 [=一侧的 18=] 导致另一侧恰好一个 "receive" 调用,返回与发送的完全相同的数据块。这在发送非常长的数据块时尤其有用,其中一个发送映射到多个接收。因此,您应该满足于服务器在尝试向客户端发送响应之前可以等待的分隔符。在 Telnet 中,该分隔符将是“\r\n”。
这就是 tokio 的 Decoder/Encoder 基础设施发挥作用的地方。这种编解码器的示例实现是 LinesCodec。如果你想拥有
Telnet,这正是您想要的。它一次只会给你一条消息,并允许你一次只发送一条这样的消息作为回复:
extern crate tokio;
use tokio::codec::Decoder;
use tokio::net::TcpListener;
use tokio::prelude::*;
use tokio::codec::LinesCodec;
use std::net::SocketAddr;
fn main() {
let addr = "127.0.0.1:15000".parse::<SocketAddr>().unwrap();
let socket = TcpListener::bind(&addr).unwrap();
println!("Listening on: {}", addr);
let done = socket.incoming()
.map_err(|e| println!("failed to accept socket; error = {:?}", e))
.for_each(move |socket| {
// Fit the line-based codec on top of the socket. This will take on the task of
// parsing incomming messages, as well as formatting outgoing ones (appending \r\n).
let (lines_tx, lines_rx) = LinesCodec::new().framed(socket).split();
// This takes every incomming message and allows to create one outgoing message for it,
// essentially generating a stream of responses.
let responses = lines_rx.map(|incomming_message| {
// Implement whatever transform rules here
if incomming_message == "hello" {
return String::from("hello from server");
}
return incomming_message;
});
// At this point `responses` is a stream of `Response` types which we
// now want to write back out to the client. To do that we use
// `Stream::fold` to perform a loop here, serializing each response and
// then writing it out to the client.
let writes = responses.fold(lines_tx, |writer, response| {
//Return the future that handles to send the response to the socket
writer.send(response)
});
// Run this request/response loop until the client closes the connection
// Then return Ok(()), ignoring all eventual errors.
tokio::spawn(
writes.then(move |_| Ok(()))
);
return Ok(());
});
tokio::run(done);
}
我正在开发一个回声服务器,它从 TCP 获取数据并对这些数据应用一些逻辑。例如,如果客户端数据以 hello
形式出现,我想将其响应为 hello from server
.
我可以使用 copy
函数转发输入数据,但这对我来说没有用。
这是我正在处理的起始代码:
extern crate futures;
extern crate tokio_core;
extern crate tokio_io;
use futures::stream::Stream;
use futures::Future;
use std::net::SocketAddr;
use tokio_core::net::TcpListener;
use tokio_core::reactor::Core;
use tokio_io::io::copy;
use tokio_io::AsyncRead;
fn main() {
let addr = "127.0.0.1:15000".parse::<SocketAddr>().unwrap();
let mut core = Core::new().unwrap();
let handle = core.handle();
let socket = TcpListener::bind(&addr, &handle).unwrap();
println!("Listening on: {}", addr);
let done = socket.incoming().for_each(move |(socket, addr)| {
let (reader, writer) = socket.split();
let amt = copy(reader, writer);
let msg = amt.then(move |result| {
match result {
Ok((amt, _, _)) => println!("wrote {} bytes to {}", amt, addr),
Err(e) => println!("error on {}: {}", addr, e),
}
Ok(())
});
handle.spawn(msg);
Ok(())
});
core.run(done).unwrap();
}
我知道我需要添加一些逻辑而不是这个复制函数,但是怎么做呢?
let amt = copy(reader, writer);
从某种意义上说,回显服务器有点特殊,来自客户端的 "request" 之后恰好是来自服务器的一个响应。这种用例的一个很好的例子是 tokio 的 TinyDB example.
然而,应该考虑的一件事是,虽然 UDP 是基于数据包的,但它以您发送它们时所用的确切形式到达另一端,而 TCP 则不是。 TCP 是一种流协议 - 它有很强的保证,即另一方接收到数据包,并且发送的数据完全按照发送的顺序接收。但是,不能保证的是,调用 [=一侧的 18=] 导致另一侧恰好一个 "receive" 调用,返回与发送的完全相同的数据块。这在发送非常长的数据块时尤其有用,其中一个发送映射到多个接收。因此,您应该满足于服务器在尝试向客户端发送响应之前可以等待的分隔符。在 Telnet 中,该分隔符将是“\r\n”。 这就是 tokio 的 Decoder/Encoder 基础设施发挥作用的地方。这种编解码器的示例实现是 LinesCodec。如果你想拥有 Telnet,这正是您想要的。它一次只会给你一条消息,并允许你一次只发送一条这样的消息作为回复:
extern crate tokio;
use tokio::codec::Decoder;
use tokio::net::TcpListener;
use tokio::prelude::*;
use tokio::codec::LinesCodec;
use std::net::SocketAddr;
fn main() {
let addr = "127.0.0.1:15000".parse::<SocketAddr>().unwrap();
let socket = TcpListener::bind(&addr).unwrap();
println!("Listening on: {}", addr);
let done = socket.incoming()
.map_err(|e| println!("failed to accept socket; error = {:?}", e))
.for_each(move |socket| {
// Fit the line-based codec on top of the socket. This will take on the task of
// parsing incomming messages, as well as formatting outgoing ones (appending \r\n).
let (lines_tx, lines_rx) = LinesCodec::new().framed(socket).split();
// This takes every incomming message and allows to create one outgoing message for it,
// essentially generating a stream of responses.
let responses = lines_rx.map(|incomming_message| {
// Implement whatever transform rules here
if incomming_message == "hello" {
return String::from("hello from server");
}
return incomming_message;
});
// At this point `responses` is a stream of `Response` types which we
// now want to write back out to the client. To do that we use
// `Stream::fold` to perform a loop here, serializing each response and
// then writing it out to the client.
let writes = responses.fold(lines_tx, |writer, response| {
//Return the future that handles to send the response to the socket
writer.send(response)
});
// Run this request/response loop until the client closes the connection
// Then return Ok(()), ignoring all eventual errors.
tokio::spawn(
writes.then(move |_| Ok(()))
);
return Ok(());
});
tokio::run(done);
}