当最后一个发送者被删除但接收者仍然活跃时,是否可以保留 Tokio MPSC 中的项目?
Is it possible to preserve items in a Tokio MPSC when the last Sender is dropped, but the Reciever is still active?
我有一个有界的 Tokio MPSC 队列,其中有一个发送者和一个接收者。我在一个任务中将几个项目放入其中(在本例中,数字 1 到 10,包括在内)并在另一个任务中将这些项目拉出。正如预期的那样,接收方处理了所有 10 个值。
然后,我在处理之间添加一些其他异步任务(在本例中,通过取消注释接收循环中的 sleep
调用),现在,当最后一个发送者是完成。
use tokio::sync::mpsc;
use tokio::time::sleep;
use std::time::Duration;
#[tokio::main]
async fn main() {
let (mut tx, mut rx) = mpsc::channel(3);
tokio::spawn(async move {
while let Some(val) = rx.recv().await {
println!("Recieved {}", val);
//sleep(Duration::from_secs(1)).await;
}
println!("Recieve finished");
});
for i in 1..=10i32 {
tx.send(i).await.unwrap();
println!("Sent {}", i);
}
}
这是注释掉 sleep
的输出(交错值的顺序有时会改变,但始终打印“已收到 10”):
Sent 1
Recieved 1
Recieved 2
Sent 2
Sent 3
Recieved 3
Recieved 4
Sent 4
Sent 5
Recieved 5
Recieved 6
Sent 6
Sent 7
Recieved 7
Recieved 8
Sent 8
Sent 9
Recieved 9
Recieved 10
Sent 10
这是未注释 sleep
的输出:
Sent 1
Sent 2
Sent 3
Recieved 1
Sent 4
Recieved 2
Sent 5
Recieved 3
Sent 6
Recieved 4
Sent 7
Recieved 5
Sent 8
Recieved 6
Sent 9
Recieved 7
Sent 10
是否有任何方法可以确定放入队列的所有值都由 Reciever
处理,即使在最后一个 Sender
被删除之后(假设 Reciever
不是下降)? close
函数似乎是这样做的,但实际上是相反的(确保在删除 Reciever
之前处理队列内容)。如果没有,是否有替代的异步友好 MPSC 实现可以提供此保证?
如评论中所述,问题是您的程序在接收器线程完成之前退出。请确保在退出之前等待它:
use tokio::sync::mpsc;
use tokio::time::sleep;
use std::time::Duration;
#[tokio::main]
async fn main() {
let (mut tx, mut rx) = mpsc::channel(3);
let join_handle = tokio::spawn(async move {
while let Some(val) = rx.recv().await {
println!("Received {}", val);
//sleep(Duration::from_secs(1)).await;
}
println!("Receive finished");
});
for i in 1..=10i32 {
tx.send(i).await.unwrap();
println!("Sent {}", i);
}
std::mem::drop(tx); // Drop the sender so the receiver doesn't listen forever
join_handle.await.unwrap(); // Wait for the receiver to finish processing
}
Sent 1
Received 1
Received 2
Sent 2
Sent 3
Sent 4
Sent 5
Received 3
Received 4
Received 5
Sent 6
Sent 7
Sent 8
Received 6
Received 7
Received 8
Sent 9
Sent 10
Received 9
Received 10
Receive finished
我有一个有界的 Tokio MPSC 队列,其中有一个发送者和一个接收者。我在一个任务中将几个项目放入其中(在本例中,数字 1 到 10,包括在内)并在另一个任务中将这些项目拉出。正如预期的那样,接收方处理了所有 10 个值。
然后,我在处理之间添加一些其他异步任务(在本例中,通过取消注释接收循环中的 sleep
调用),现在,当最后一个发送者是完成。
use tokio::sync::mpsc;
use tokio::time::sleep;
use std::time::Duration;
#[tokio::main]
async fn main() {
let (mut tx, mut rx) = mpsc::channel(3);
tokio::spawn(async move {
while let Some(val) = rx.recv().await {
println!("Recieved {}", val);
//sleep(Duration::from_secs(1)).await;
}
println!("Recieve finished");
});
for i in 1..=10i32 {
tx.send(i).await.unwrap();
println!("Sent {}", i);
}
}
这是注释掉 sleep
的输出(交错值的顺序有时会改变,但始终打印“已收到 10”):
Sent 1
Recieved 1
Recieved 2
Sent 2
Sent 3
Recieved 3
Recieved 4
Sent 4
Sent 5
Recieved 5
Recieved 6
Sent 6
Sent 7
Recieved 7
Recieved 8
Sent 8
Sent 9
Recieved 9
Recieved 10
Sent 10
这是未注释 sleep
的输出:
Sent 1
Sent 2
Sent 3
Recieved 1
Sent 4
Recieved 2
Sent 5
Recieved 3
Sent 6
Recieved 4
Sent 7
Recieved 5
Sent 8
Recieved 6
Sent 9
Recieved 7
Sent 10
是否有任何方法可以确定放入队列的所有值都由 Reciever
处理,即使在最后一个 Sender
被删除之后(假设 Reciever
不是下降)? close
函数似乎是这样做的,但实际上是相反的(确保在删除 Reciever
之前处理队列内容)。如果没有,是否有替代的异步友好 MPSC 实现可以提供此保证?
如评论中所述,问题是您的程序在接收器线程完成之前退出。请确保在退出之前等待它:
use tokio::sync::mpsc;
use tokio::time::sleep;
use std::time::Duration;
#[tokio::main]
async fn main() {
let (mut tx, mut rx) = mpsc::channel(3);
let join_handle = tokio::spawn(async move {
while let Some(val) = rx.recv().await {
println!("Received {}", val);
//sleep(Duration::from_secs(1)).await;
}
println!("Receive finished");
});
for i in 1..=10i32 {
tx.send(i).await.unwrap();
println!("Sent {}", i);
}
std::mem::drop(tx); // Drop the sender so the receiver doesn't listen forever
join_handle.await.unwrap(); // Wait for the receiver to finish processing
}
Sent 1
Received 1
Received 2
Sent 2
Sent 3
Sent 4
Sent 5
Received 3
Received 4
Received 5
Sent 6
Sent 7
Sent 8
Received 6
Received 7
Received 8
Sent 9
Sent 10
Received 9
Received 10
Receive finished