发送到数组中的每个 futures::sync::mpsc::Sender
Send to each futures::sync::mpsc::Sender in array
我有一个 futures::sync::mpsc::Sender
的动态集合,我想为每个传入连接发送一条消息。
我让它与 UnboundedSender
一起工作,因为我可以做到这一点(见下文)但是 Sender
会自行消耗,所以我需要将其移除并重新插入 Vec
发送后。我怎样才能做到这一点?如果 Sender
阻塞,它不应该发送更多消息,而是切换到处理接收器上的传入连接。
下面是 UnboundedSender
的实现,我尝试这样做的失败被内联注释掉了(只需用注释掉的一行替换前面的行)。
UnboundedSender(有效)
extern crate tokio;
use tokio::runtime::current_thread;
extern crate futures;
use futures::{stream, Stream, Sink};
use futures::sync::mpsc;
fn main() {
let values = vec![1 as i8, 2, 0, 1, 2, 3, 0, 1, 2, 3, -1];
let mut senders = Vec::<mpsc::UnboundedSender<i8>>::new();
let stream = stream::iter_ok::<_, ()>(values)
.for_each(|v| {
match v {
0 => {
println!("Adding channel");
let (sender, receiver) = mpsc::unbounded();
senders.push(sender);
current_thread::spawn(receiver.for_each(|v| {
println!("Received {}", v);
Ok(())
}))
},
-1 => {
println!("Closing channels");
senders.clear();
},
x => {
for sender in senders.iter() {
println!("Sending {}", x);
sender.unbounded_send(x).unwrap();
}
},
}
Ok(())
});
current_thread::block_on_all(stream)
.expect("Failed to run stream");
println!("Done!");
}
发件人(不起作用)
extern crate tokio;
use tokio::runtime::current_thread;
extern crate futures;
use futures::{stream, Stream, Sink};
use futures::sync::mpsc;
fn main() {
let values = vec![1 as i8, 2, 0, 1, 2, 3, 0, 1, 2, 3, -1];
let mut senders = Vec::<mpsc::Sender<i8>>::new();
let stream = stream::iter_ok::<_, ()>(values)
.for_each(|v| {
match v {
0 => {
println!("Adding channel");
let (sender, receiver) = mpsc::channel(1);
senders.push(sender);
current_thread::spawn(receiver.for_each(|v| {
println!("Received {}", v);
Ok(())
}))
},
-1 => {
println!("Closing channels");
senders.clear();
},
x => {
for sender in senders.iter() {
println!("Sending {}", x);
sender.send(x);
//^error[E0507]: cannot move out of borrowed content
}
},
}
Ok(())
});
current_thread::block_on_all(stream)
.expect("Failed to run stream");
println!("Done!");
}
AFAIK,你有两个主要问题,send()
取得了 Sender
的所有权,所以你必须克隆某个地方,如果你想以后重用它,而且它 returns 未来你必须以某种方式处理。
有多种方法可以解决这些问题,这里是一种:
extern crate futures;
extern crate tokio;
use futures::sync::mpsc;
use futures::Future;
use futures::{stream, Sink, Stream};
fn main() {
let values = vec![1i8, 2, 0, 1, 2, 3, 0, 1, 2, 3, -1]; // remove cast syntax
let mut senders = vec![]; // remove annotations
let stream = stream::iter_ok(values).for_each(move |v| { // move senders
match v {
0 => {
println!("Adding channel");
let (sender, receiver) = mpsc::channel(1);
senders.push(sender);
tokio::spawn(receiver.for_each(|v| {
println!("Received {}", v);
Ok(())
}));
}
-1 => {
println!("Closing channels");
senders.clear();
}
x => {
for sender in senders.iter() {
let send = sender
.clone() // clone sender
.send(x)
.map(move |_| println!("Sending {}", x))
.map_err(|e| eprintln!("error = {:?}", e));
tokio::spawn(send); // spawn the task
}
}
}
Ok(())
});
tokio::run(stream);
println!("Done!");
}
我想我已经解决了 - 诀窍是传递 senders
并继续将其传递到期货链中。这不处理 -1
以清除发件人,但扩展很简单。
extern crate tokio;
use tokio::runtime::current_thread;
extern crate futures;
use futures::{stream, Stream, Sink, Future, IntoFuture};
use futures::sync::mpsc;
use futures::future::Either;
fn main() {
let values = vec![0, 1, 0, 2, 3];
let stream = stream::iter_ok::<Vec<i8>, mpsc::SendError<i8>>(values)
.fold(Vec::new(), |mut senders, v| {
match v {
0 => {
println!("Adding channel");
let (sender, receiver) = mpsc::channel(0);
senders.push(sender);
let idx = senders.len();
current_thread::spawn(receiver.for_each(move |v| {
println!("Received {} in channel {}", v, idx);
Ok(())
}));
Either::A(Ok(senders).into_future())
},
value => {
println!("Sending {}...", value);
Either::B(stream::iter_ok(senders).and_then(move |tx| {
tx.send(value)
}).collect().map(move |senders| {
println!("Sent {}.", value);
senders
}))
},
}
}).map(drop);
current_thread::block_on_all(stream)
.expect("Failed to run stream");
println!("Done!");
}
这输出:
Adding channel
Sending 1...
Received 1 in channel 1
Sent 1.
Adding channel
Sending 2...
Received 2 in channel 1
Received 2 in channel 2
Sent 2.
Sending 3...
Received 3 in channel 1
Received 3 in channel 2
Sent 3.
Done!
我有一个 futures::sync::mpsc::Sender
的动态集合,我想为每个传入连接发送一条消息。
我让它与 UnboundedSender
一起工作,因为我可以做到这一点(见下文)但是 Sender
会自行消耗,所以我需要将其移除并重新插入 Vec
发送后。我怎样才能做到这一点?如果 Sender
阻塞,它不应该发送更多消息,而是切换到处理接收器上的传入连接。
下面是 UnboundedSender
的实现,我尝试这样做的失败被内联注释掉了(只需用注释掉的一行替换前面的行)。
UnboundedSender(有效)
extern crate tokio;
use tokio::runtime::current_thread;
extern crate futures;
use futures::{stream, Stream, Sink};
use futures::sync::mpsc;
fn main() {
let values = vec![1 as i8, 2, 0, 1, 2, 3, 0, 1, 2, 3, -1];
let mut senders = Vec::<mpsc::UnboundedSender<i8>>::new();
let stream = stream::iter_ok::<_, ()>(values)
.for_each(|v| {
match v {
0 => {
println!("Adding channel");
let (sender, receiver) = mpsc::unbounded();
senders.push(sender);
current_thread::spawn(receiver.for_each(|v| {
println!("Received {}", v);
Ok(())
}))
},
-1 => {
println!("Closing channels");
senders.clear();
},
x => {
for sender in senders.iter() {
println!("Sending {}", x);
sender.unbounded_send(x).unwrap();
}
},
}
Ok(())
});
current_thread::block_on_all(stream)
.expect("Failed to run stream");
println!("Done!");
}
发件人(不起作用)
extern crate tokio;
use tokio::runtime::current_thread;
extern crate futures;
use futures::{stream, Stream, Sink};
use futures::sync::mpsc;
fn main() {
let values = vec![1 as i8, 2, 0, 1, 2, 3, 0, 1, 2, 3, -1];
let mut senders = Vec::<mpsc::Sender<i8>>::new();
let stream = stream::iter_ok::<_, ()>(values)
.for_each(|v| {
match v {
0 => {
println!("Adding channel");
let (sender, receiver) = mpsc::channel(1);
senders.push(sender);
current_thread::spawn(receiver.for_each(|v| {
println!("Received {}", v);
Ok(())
}))
},
-1 => {
println!("Closing channels");
senders.clear();
},
x => {
for sender in senders.iter() {
println!("Sending {}", x);
sender.send(x);
//^error[E0507]: cannot move out of borrowed content
}
},
}
Ok(())
});
current_thread::block_on_all(stream)
.expect("Failed to run stream");
println!("Done!");
}
AFAIK,你有两个主要问题,send()
取得了 Sender
的所有权,所以你必须克隆某个地方,如果你想以后重用它,而且它 returns 未来你必须以某种方式处理。
有多种方法可以解决这些问题,这里是一种:
extern crate futures;
extern crate tokio;
use futures::sync::mpsc;
use futures::Future;
use futures::{stream, Sink, Stream};
fn main() {
let values = vec![1i8, 2, 0, 1, 2, 3, 0, 1, 2, 3, -1]; // remove cast syntax
let mut senders = vec![]; // remove annotations
let stream = stream::iter_ok(values).for_each(move |v| { // move senders
match v {
0 => {
println!("Adding channel");
let (sender, receiver) = mpsc::channel(1);
senders.push(sender);
tokio::spawn(receiver.for_each(|v| {
println!("Received {}", v);
Ok(())
}));
}
-1 => {
println!("Closing channels");
senders.clear();
}
x => {
for sender in senders.iter() {
let send = sender
.clone() // clone sender
.send(x)
.map(move |_| println!("Sending {}", x))
.map_err(|e| eprintln!("error = {:?}", e));
tokio::spawn(send); // spawn the task
}
}
}
Ok(())
});
tokio::run(stream);
println!("Done!");
}
我想我已经解决了 - 诀窍是传递 senders
并继续将其传递到期货链中。这不处理 -1
以清除发件人,但扩展很简单。
extern crate tokio;
use tokio::runtime::current_thread;
extern crate futures;
use futures::{stream, Stream, Sink, Future, IntoFuture};
use futures::sync::mpsc;
use futures::future::Either;
fn main() {
let values = vec![0, 1, 0, 2, 3];
let stream = stream::iter_ok::<Vec<i8>, mpsc::SendError<i8>>(values)
.fold(Vec::new(), |mut senders, v| {
match v {
0 => {
println!("Adding channel");
let (sender, receiver) = mpsc::channel(0);
senders.push(sender);
let idx = senders.len();
current_thread::spawn(receiver.for_each(move |v| {
println!("Received {} in channel {}", v, idx);
Ok(())
}));
Either::A(Ok(senders).into_future())
},
value => {
println!("Sending {}...", value);
Either::B(stream::iter_ok(senders).and_then(move |tx| {
tx.send(value)
}).collect().map(move |senders| {
println!("Sent {}.", value);
senders
}))
},
}
}).map(drop);
current_thread::block_on_all(stream)
.expect("Failed to run stream");
println!("Done!");
}
这输出:
Adding channel
Sending 1...
Received 1 in channel 1
Sent 1.
Adding channel
Sending 2...
Received 2 in channel 1
Received 2 in channel 2
Sent 2.
Sending 3...
Received 3 in channel 1
Received 3 in channel 2
Sent 3.
Done!