当最后一个发送者被删除但接收者仍然活跃时,是否可以保留 Tokio MPSC 中的项目?

Is it possible to preserve items in a Tokio MPSC when the last Sender is dropped, but the Reciever is still active?

我有一个有界的 Tokio MPSC 队列,其中有一个发送者和一个接收者。我在一个任务中将几个项目放入其中(在本例中,数字 1 到 10,包括在内)并在另一个任务中将这些项目拉出。正如预期的那样,接收方处理了所有 10 个值。

然后,我在处理之间添加一些其他异步任务(在本例中,通过取消注释接收循环中的 sleep 调用),现在,当最后一个发送者是完成。

use tokio::sync::mpsc;
use tokio::time::sleep;
use std::time::Duration;

#[tokio::main]
async fn main() {
    let (mut tx, mut rx) = mpsc::channel(3);
    
    tokio::spawn(async move {
        while let Some(val) = rx.recv().await {
            println!("Recieved {}", val);
            //sleep(Duration::from_secs(1)).await;
        }
        
        println!("Recieve finished");
    });
    
    for i in 1..=10i32 {
        tx.send(i).await.unwrap();
        println!("Sent {}", i);
    }
}

Rust Playground

这是注释掉 sleep 的输出(交错值的顺序有时会改变,但始终打印“已收到 10”):

Sent 1
Recieved 1
Recieved 2
Sent 2
Sent 3
Recieved 3
Recieved 4
Sent 4
Sent 5
Recieved 5
Recieved 6
Sent 6
Sent 7
Recieved 7
Recieved 8
Sent 8
Sent 9
Recieved 9
Recieved 10
Sent 10

这是未注释 sleep 的输出:

Sent 1
Sent 2
Sent 3
Recieved 1
Sent 4
Recieved 2
Sent 5
Recieved 3
Sent 6
Recieved 4
Sent 7
Recieved 5
Sent 8
Recieved 6
Sent 9
Recieved 7
Sent 10

是否有任何方法可以确定放入队列的所有值都由 Reciever 处理,即使在最后一个 Sender 被删除之后(假设 Reciever 不是下降)? close 函数似乎是这样做的,但实际上是相反的(确保在删除 Reciever 之前处理队列内容)。如果没有,是否有替代的异步友好 MPSC 实现可以提供此保证?

如评论中所述,问题是您的程序在接收器线程完成之前退出。请确保在退出之前等待它:

use tokio::sync::mpsc;
use tokio::time::sleep;
use std::time::Duration;

#[tokio::main]
async fn main() {
    let (mut tx, mut rx) = mpsc::channel(3);
    
    let join_handle = tokio::spawn(async move {
        while let Some(val) = rx.recv().await {
            println!("Received {}", val);
            //sleep(Duration::from_secs(1)).await;
        }
        
        println!("Receive finished");
    });
    
    for i in 1..=10i32 {
        tx.send(i).await.unwrap();
        println!("Sent {}", i);
    }
    
    std::mem::drop(tx); // Drop the sender so the receiver doesn't listen forever
    join_handle.await.unwrap(); // Wait for the receiver to finish processing
}
Sent 1
Received 1
Received 2
Sent 2
Sent 3
Sent 4
Sent 5
Received 3
Received 4
Received 5
Sent 6
Sent 7
Sent 8
Received 6
Received 7
Received 8
Sent 9
Sent 10
Received 9
Received 10
Receive finished