如何无限期地读取 Rust Tokio 任务中的无界通道?

How to indefinitely read an unbounded channel in a Rust Tokio task?

我假设 .for_each() 可以解决问题,但它 returns 仅来自频道的第一个(未来)项目,然后 returns 如果频道为空。像 .for_each() 在 non-Tokio/future 上下文中那样无限期地读取任务中的频道的方法是什么?

let tx_origs_reader = rx_chan.for_each(move |tx_orig| {
    //save receiver side tx to db
    let mut tx_origs_once = tx_origs_inner.borrow_mut();
    tx_origs_once.push(tx_orig.clone());  
    Ok(())
});
handle.spawn(tx_origs_reader.then(|err| {
    println!("This returns after first item without an error {:?}", err);
    Ok(())
}));

for_each 处理方式是最好的方式,应该有效 - 而且有效!在 Gitter 的 tokio-rs 人员的帮助下(谢谢!)用简单的测试代码调试它是 tx 端的问题。

在我看来,Rust 是如此先进,它实际上知道在这种情况下放弃任务:基于此的日志输出让我感到困惑,实际上认为问题出在 rx 端,尽管它是 tx 端问题一直存在。