任务间的通道通信
Channel communication between tasks
我正在尝试在一个 hyper service and one tokio 流之间建立基于通道的通信。问题是编译器出现以下错误:
closure is FnOnce
because it moves the variable tx_queue
out of
its environment.
阅读 rustc --explain E0525
提供的解释后,似乎 tokio::sync::mpsc::Sender 实现了 Clone
但没有实现 Copy
(除非我忽略了什么)。
所以我有点卡住了。如何让我的服务通过 tokio::sync::mpsc
频道向 tokio 流发送消息?我确定我错过了一些明显但看不到的东西:/
有问题的代码摘录(根据@E_net4 的要求进行了修改,使其更短):
extern crate hyper;
extern crate tokio;
extern crate tokio_signal;
use futures::Stream;
use hyper::rt::Future;
use hyper::service::service_fn_ok;
use hyper::{Body, Request, Response, Server};
use futures::sink::Sink;
use futures::sync::{mpsc, oneshot};
use futures::{future, stream};
fn main() {
let mut runtime = tokio::runtime::Runtime::new().unwrap();
let (tx1, rx1) = oneshot::channel::<()>();
let (tx_queue, rx_queue) = mpsc::channel(10);
// ----
runtime.spawn(start_queue(rx_queue));
// ----
let http_server = Server::bind(&([127, 0, 0, 1], 3000).into()).serve(|| {
service_fn_ok(move |_: Request<Body>| {
tx_queue.send(1);
Response::new(Body::from("Hello World!"))
})
});
let graceful = http_server
.with_graceful_shutdown(rx1)
.map_err(|err| eprintln!("server error: {}", err))
.and_then(|_| {
dbg!("stopped");
// TODO: stop order queue listener
Ok(())
});
dbg!("HTTP server listening ...");
runtime.spawn(graceful);
// ----
tx1.send(()).unwrap();
dbg!("exited");
}
pub fn start_queue(rx: mpsc::Receiver<usize>) -> impl Future<Item = (), Error = ()> {
#[derive(Eq, PartialEq)]
enum Item {
Value(usize),
Tick,
Done,
}
let items = rx
.map(Item::Value)
.chain(stream::once(Ok(Item::Done)))
.take_while(|item| future::ok(*item != Item::Done));
items
.fold(0, |num, _item| {
dbg!("x");
future::ok(num)
})
.map(|_| ())
}
完整代码可在此处获得:https://gist.github.com/jeromer/52aa2da43c5c93584c6ee55be68dd04e
谢谢:)
futures::sync::mpsc::Sender::send
消耗 Sender
并生成一个 Send
对象,这是一个必须 运行 完成才能实际发送数据的未来。如果通道已满,它将阻塞,直到其他人从通道接收。完成后,它会返回 Sender
,您可以使用它来发送更多数据。
在这种情况下,我认为您不能仅使用 Sender
的单个实例来构建代码。您需要克隆它,以便每次调用服务功能时都有新的克隆。注意现在两个闭包都是 move
:
let http_server = Server::bind(&([127, 0, 0, 1], 3000).into()).serve(move || {
// This closure has one instance of tx_queue that was moved-in here.
// Now we make a copy to be moved into the closure below.
let tx_queue = tx_queue.clone();
service_fn_ok(move |_: Request<Body>| {
// This closure has one instance of tx_queue, but it will be called
// multiple times, so it can not consume it. It must make a copy
// before consuming it.
tx_queue.clone().send(111);
Response::new(Body::from("Hello World!"))
})
});
但是,这会给您以下警告:
warning: unused `futures::sink::send::Send` that must be used
正如我所说,send
只是给你一个必须 运行 实际执行发送的未来。如果您忽略 return 值,则不会发生任何事情。在这种情况下,最好 spawn
它作为一个单独的任务(这样它就不会阻止响应客户端)。要生成它,您需要一个来自运行时的执行程序,还必须为内部闭包克隆它:
let executor = runtime.executor();
let http_server = Server::bind(&([127, 0, 0, 1], 3000).into()).serve(move || {
let tx_queue = tx_queue.clone();
let executor = executor.clone();
service_fn_ok(move |_: Request<Body>| {
executor.spawn(tx_queue.clone().send(111).map(|_| ()).map_err(|err| {
// TODO: Handle the error differenty!
panic!("Error in mpsc {:?}", err);
}));
Response::new(Body::from("Hello World!"))
})
});
我正在尝试在一个 hyper service and one tokio 流之间建立基于通道的通信。问题是编译器出现以下错误:
closure is
FnOnce
because it moves the variabletx_queue
out of its environment.
阅读 rustc --explain E0525
提供的解释后,似乎 tokio::sync::mpsc::Sender 实现了 Clone
但没有实现 Copy
(除非我忽略了什么)。
所以我有点卡住了。如何让我的服务通过 tokio::sync::mpsc
频道向 tokio 流发送消息?我确定我错过了一些明显但看不到的东西:/
有问题的代码摘录(根据@E_net4 的要求进行了修改,使其更短):
extern crate hyper;
extern crate tokio;
extern crate tokio_signal;
use futures::Stream;
use hyper::rt::Future;
use hyper::service::service_fn_ok;
use hyper::{Body, Request, Response, Server};
use futures::sink::Sink;
use futures::sync::{mpsc, oneshot};
use futures::{future, stream};
fn main() {
let mut runtime = tokio::runtime::Runtime::new().unwrap();
let (tx1, rx1) = oneshot::channel::<()>();
let (tx_queue, rx_queue) = mpsc::channel(10);
// ----
runtime.spawn(start_queue(rx_queue));
// ----
let http_server = Server::bind(&([127, 0, 0, 1], 3000).into()).serve(|| {
service_fn_ok(move |_: Request<Body>| {
tx_queue.send(1);
Response::new(Body::from("Hello World!"))
})
});
let graceful = http_server
.with_graceful_shutdown(rx1)
.map_err(|err| eprintln!("server error: {}", err))
.and_then(|_| {
dbg!("stopped");
// TODO: stop order queue listener
Ok(())
});
dbg!("HTTP server listening ...");
runtime.spawn(graceful);
// ----
tx1.send(()).unwrap();
dbg!("exited");
}
pub fn start_queue(rx: mpsc::Receiver<usize>) -> impl Future<Item = (), Error = ()> {
#[derive(Eq, PartialEq)]
enum Item {
Value(usize),
Tick,
Done,
}
let items = rx
.map(Item::Value)
.chain(stream::once(Ok(Item::Done)))
.take_while(|item| future::ok(*item != Item::Done));
items
.fold(0, |num, _item| {
dbg!("x");
future::ok(num)
})
.map(|_| ())
}
完整代码可在此处获得:https://gist.github.com/jeromer/52aa2da43c5c93584c6ee55be68dd04e
谢谢:)
futures::sync::mpsc::Sender::send
消耗 Sender
并生成一个 Send
对象,这是一个必须 运行 完成才能实际发送数据的未来。如果通道已满,它将阻塞,直到其他人从通道接收。完成后,它会返回 Sender
,您可以使用它来发送更多数据。
在这种情况下,我认为您不能仅使用 Sender
的单个实例来构建代码。您需要克隆它,以便每次调用服务功能时都有新的克隆。注意现在两个闭包都是 move
:
let http_server = Server::bind(&([127, 0, 0, 1], 3000).into()).serve(move || {
// This closure has one instance of tx_queue that was moved-in here.
// Now we make a copy to be moved into the closure below.
let tx_queue = tx_queue.clone();
service_fn_ok(move |_: Request<Body>| {
// This closure has one instance of tx_queue, but it will be called
// multiple times, so it can not consume it. It must make a copy
// before consuming it.
tx_queue.clone().send(111);
Response::new(Body::from("Hello World!"))
})
});
但是,这会给您以下警告:
warning: unused `futures::sink::send::Send` that must be used
正如我所说,send
只是给你一个必须 运行 实际执行发送的未来。如果您忽略 return 值,则不会发生任何事情。在这种情况下,最好 spawn
它作为一个单独的任务(这样它就不会阻止响应客户端)。要生成它,您需要一个来自运行时的执行程序,还必须为内部闭包克隆它:
let executor = runtime.executor();
let http_server = Server::bind(&([127, 0, 0, 1], 3000).into()).serve(move || {
let tx_queue = tx_queue.clone();
let executor = executor.clone();
service_fn_ok(move |_: Request<Body>| {
executor.spawn(tx_queue.clone().send(111).map(|_| ()).map_err(|err| {
// TODO: Handle the error differenty!
panic!("Error in mpsc {:?}", err);
}));
Response::new(Body::from("Hello World!"))
})
});