如何无限期地读取 Rust Tokio 任务中的无界通道?
How to indefinitely read an unbounded channel in a Rust Tokio task?
我假设 .for_each()
可以解决问题,但它 returns 仅来自频道的第一个(未来)项目,然后 returns 如果频道为空。像 .for_each()
在 non-Tokio/future 上下文中那样无限期地读取任务中的频道的方法是什么?
let tx_origs_reader = rx_chan.for_each(move |tx_orig| {
//save receiver side tx to db
let mut tx_origs_once = tx_origs_inner.borrow_mut();
tx_origs_once.push(tx_orig.clone());
Ok(())
});
handle.spawn(tx_origs_reader.then(|err| {
println!("This returns after first item without an error {:?}", err);
Ok(())
}));
for_each
处理方式是最好的方式,应该有效 - 而且有效!在 Gitter 的 tokio-rs 人员的帮助下(谢谢!)用简单的测试代码调试它是 tx 端的问题。
在我看来,Rust 是如此先进,它实际上知道在这种情况下放弃任务:基于此的日志输出让我感到困惑,实际上认为问题出在 rx 端,尽管它是 tx 端问题一直存在。
我假设 .for_each()
可以解决问题,但它 returns 仅来自频道的第一个(未来)项目,然后 returns 如果频道为空。像 .for_each()
在 non-Tokio/future 上下文中那样无限期地读取任务中的频道的方法是什么?
let tx_origs_reader = rx_chan.for_each(move |tx_orig| {
//save receiver side tx to db
let mut tx_origs_once = tx_origs_inner.borrow_mut();
tx_origs_once.push(tx_orig.clone());
Ok(())
});
handle.spawn(tx_origs_reader.then(|err| {
println!("This returns after first item without an error {:?}", err);
Ok(())
}));
for_each
处理方式是最好的方式,应该有效 - 而且有效!在 Gitter 的 tokio-rs 人员的帮助下(谢谢!)用简单的测试代码调试它是 tx 端的问题。
在我看来,Rust 是如此先进,它实际上知道在这种情况下放弃任务:基于此的日志输出让我感到困惑,实际上认为问题出在 rx 端,尽管它是 tx 端问题一直存在。