东京回声服务器。不能在同一个未来读写
Tokio echo server. Cannot read and write in the same future
我正在尝试在 Tokio 中构建一个回显服务器。我看过示例,但它们似乎都使用了 Tokio IO 中的 io::copy
,我不能使用它,因为我想修改输出。
但是,我无法编译同时使用writer
和reader
的服务器。我想构建一个基于 futures 的任务,在循环中启用 reading/writing(回显服务器)。
我的实际代码是这样的:
extern crate futures;
extern crate futures_cpupool;
extern crate tokio;
extern crate tokio_io;
use futures::prelude::*;
use futures_cpupool::CpuPool;
use tokio_io::AsyncRead;
use futures::Stream;
use futures::stream;
use tokio_io::codec::*;
use std::rc::Rc;
fn main() {
let pool = CpuPool::new_num_cpus();
use std::net::*;
let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
let listener = tokio::net::TcpListener::bind(&socket).unwrap();
let server = listener.incoming().for_each(|socket| {
let (writer, reader) = socket.framed(LinesCodec::new()).split();
let writer = Rc::new(writer);
let action = reader.for_each(|line| {
println!("ECHO: {}", line);
writer.send(line);
Ok(())
});
pool.spawn(action); // std::rc::Rc<futures::stream::SplitSink<tokio_io::codec::Framed<tokio::net::TcpStream, tokio_io::codec::LinesCodec>>>` cannot be shared between threads safely
Ok(())
});
server.wait().unwrap();
}
你可能会说我必须使用Arc
,因为涉及到不同的线程。我试过 Arc
和 Mutex
,但出现了另一个错误,我想不出一种方法来编译它:
extern crate futures;
extern crate futures_cpupool;
extern crate tokio;
extern crate tokio_io;
use futures::prelude::*;
use std::time;
use std::thread;
use futures_cpupool::CpuPool;
use tokio_io::AsyncRead;
use futures::Stream;
use tokio_io::codec::*;
use std::sync::Arc;
use std::sync::Mutex;
fn main() {
let pool = CpuPool::new_num_cpus();
use std::net::*;
let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
let listener = tokio::net::TcpListener::bind(&socket).unwrap();
let server = listener.incoming().for_each(|socket| {
let (writer, reader) = socket.framed(LinesCodec::new()).split();
let writer = Arc::new(Mutex::new(writer));
let action = reader.for_each(move |line| {
println!("ECHO: {}", line);
writer.lock().unwrap().send(line); // cannot move out of borrowed content
Ok(())
});
pool.spawn(action);
Ok(())
});
server.wait().unwrap();
}
它说的错误是:cannot move out of borrowed content
我终于发现 forward
是我问题的答案。
extern crate tokio;
extern crate tokio_io;
extern crate futures;
use futures::prelude::*;
use tokio_io::AsyncRead;
use futures::Stream;
use tokio_io::codec::*;
struct Cancellable{
rx: std::sync::mpsc::Receiver<()>,
}
impl Future for Cancellable {
type Item = ();
type Error = std::sync::mpsc::RecvError;
fn poll(&mut self) -> Result<Async<Self::Item>,Self::Error> {
match self.rx.try_recv() {
Ok(_) => Ok(Async::Ready(())),
Err(_) => Ok(Async::NotReady)
}
}
}
fn main() {
use std::net::*;
let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
let listener = tokio::net::TcpListener::bind(&socket).unwrap();
let server = listener.incoming().for_each(|socket|{
let (writer,reader) = socket.framed(LinesCodec::new()).split();
let (tx,rx) = std::sync::mpsc::channel();
let cancel = Cancellable {
rx: rx,
};
let action = reader
.map(move |line|{
println!("ECHO: {}",line);
if line == "bye"{
println!("BYE");
tx.send(()).unwrap();
}
line
})
.forward(writer)
.select2(cancel)
.map(|_|{
})
.map_err(|err|{
println!("error");
});
tokio::executor::current_thread::spawn(action);
Ok(())
}).map_err(|err|{
println!("error = {:?}",err);
});
tokio::executor::current_thread::run(|_|{
tokio::executor::current_thread::spawn(server);
});
}
我正在尝试在 Tokio 中构建一个回显服务器。我看过示例,但它们似乎都使用了 Tokio IO 中的 io::copy
,我不能使用它,因为我想修改输出。
但是,我无法编译同时使用writer
和reader
的服务器。我想构建一个基于 futures 的任务,在循环中启用 reading/writing(回显服务器)。
我的实际代码是这样的:
extern crate futures;
extern crate futures_cpupool;
extern crate tokio;
extern crate tokio_io;
use futures::prelude::*;
use futures_cpupool::CpuPool;
use tokio_io::AsyncRead;
use futures::Stream;
use futures::stream;
use tokio_io::codec::*;
use std::rc::Rc;
fn main() {
let pool = CpuPool::new_num_cpus();
use std::net::*;
let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
let listener = tokio::net::TcpListener::bind(&socket).unwrap();
let server = listener.incoming().for_each(|socket| {
let (writer, reader) = socket.framed(LinesCodec::new()).split();
let writer = Rc::new(writer);
let action = reader.for_each(|line| {
println!("ECHO: {}", line);
writer.send(line);
Ok(())
});
pool.spawn(action); // std::rc::Rc<futures::stream::SplitSink<tokio_io::codec::Framed<tokio::net::TcpStream, tokio_io::codec::LinesCodec>>>` cannot be shared between threads safely
Ok(())
});
server.wait().unwrap();
}
你可能会说我必须使用Arc
,因为涉及到不同的线程。我试过 Arc
和 Mutex
,但出现了另一个错误,我想不出一种方法来编译它:
extern crate futures;
extern crate futures_cpupool;
extern crate tokio;
extern crate tokio_io;
use futures::prelude::*;
use std::time;
use std::thread;
use futures_cpupool::CpuPool;
use tokio_io::AsyncRead;
use futures::Stream;
use tokio_io::codec::*;
use std::sync::Arc;
use std::sync::Mutex;
fn main() {
let pool = CpuPool::new_num_cpus();
use std::net::*;
let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
let listener = tokio::net::TcpListener::bind(&socket).unwrap();
let server = listener.incoming().for_each(|socket| {
let (writer, reader) = socket.framed(LinesCodec::new()).split();
let writer = Arc::new(Mutex::new(writer));
let action = reader.for_each(move |line| {
println!("ECHO: {}", line);
writer.lock().unwrap().send(line); // cannot move out of borrowed content
Ok(())
});
pool.spawn(action);
Ok(())
});
server.wait().unwrap();
}
它说的错误是:cannot move out of borrowed content
我终于发现 forward
是我问题的答案。
extern crate tokio;
extern crate tokio_io;
extern crate futures;
use futures::prelude::*;
use tokio_io::AsyncRead;
use futures::Stream;
use tokio_io::codec::*;
struct Cancellable{
rx: std::sync::mpsc::Receiver<()>,
}
impl Future for Cancellable {
type Item = ();
type Error = std::sync::mpsc::RecvError;
fn poll(&mut self) -> Result<Async<Self::Item>,Self::Error> {
match self.rx.try_recv() {
Ok(_) => Ok(Async::Ready(())),
Err(_) => Ok(Async::NotReady)
}
}
}
fn main() {
use std::net::*;
let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
let listener = tokio::net::TcpListener::bind(&socket).unwrap();
let server = listener.incoming().for_each(|socket|{
let (writer,reader) = socket.framed(LinesCodec::new()).split();
let (tx,rx) = std::sync::mpsc::channel();
let cancel = Cancellable {
rx: rx,
};
let action = reader
.map(move |line|{
println!("ECHO: {}",line);
if line == "bye"{
println!("BYE");
tx.send(()).unwrap();
}
line
})
.forward(writer)
.select2(cancel)
.map(|_|{
})
.map_err(|err|{
println!("error");
});
tokio::executor::current_thread::spawn(action);
Ok(())
}).map_err(|err|{
println!("error = {:?}",err);
});
tokio::executor::current_thread::run(|_|{
tokio::executor::current_thread::spawn(server);
});
}