tokio 的 mpsc 通道上的接收器仅在缓冲区已满时接收消息
Receiver on tokio's mpsc channel only receives messages when buffer is full
我花了几个小时试图解决这个问题,我已经完成了。我发现了一个名字相似的问题,但看起来好像有什么东西在同步阻塞,这让 tokio 很混乱。这很可能是这里的问题,但我完全不知道是什么原因造成的。
这是我的项目的一个精简版本,希望能解决这个问题。
use std::io;
use futures_util::{
SinkExt,
stream::{SplitSink, SplitStream},
StreamExt,
};
use tokio::{
net::TcpStream,
sync::mpsc::{channel, Receiver, Sender},
};
use tokio_tungstenite::{
connect_async,
MaybeTlsStream,
tungstenite::Message,
WebSocketStream,
};
#[tokio::main]
async fn main() {
connect_to_server("wss://a_valid_domain.com".to_string()).await;
}
async fn read_line() -> String {
loop {
let mut str = String::new();
io::stdin().read_line(&mut str).unwrap();
str = str.trim().to_string();
if !str.is_empty() {
return str;
}
}
}
async fn connect_to_server(url: String) {
let (ws_stream, _) = connect_async(url).await.unwrap();
let (write, read) = ws_stream.split();
let (tx, rx) = channel::<ChannelMessage>(100);
tokio::spawn(channel_thread(write, rx));
tokio::spawn(handle_std_input(tx.clone()));
read_messages(read, tx).await;
}
#[derive(Debug)]
enum ChannelMessage {
Text(String),
Close,
}
// PROBLEMATIC FUNCTION
async fn channel_thread(
mut write: SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>,
mut rx: Receiver<ChannelMessage>,
) {
while let Some(msg) = rx.recv().await {
println!("{:?}", msg); // This only fires when buffer is full
match msg {
ChannelMessage::Text(text) => write.send(Message::Text(text)).await.unwrap(),
ChannelMessage::Close => {
write.close().await.unwrap();
rx.close();
return;
}
}
}
}
async fn read_messages(
mut read: SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>,
tx: Sender<ChannelMessage>,
) {
while let Some(msg) = read.next().await {
let msg = match msg {
Ok(m) => m,
Err(_) => continue
};
match msg {
Message::Text(m) => println!("{}", m),
Message::Close(_) => break,
_ => {}
}
}
if !tx.is_closed() {
let _ = tx.send(ChannelMessage::Close).await;
}
}
async fn handle_std_input(tx: Sender<ChannelMessage>) {
loop {
let str = read_line().await;
if tx.is_closed() {
break;
}
tx.send(ChannelMessage::Text(str)).await.unwrap();
}
}
如您所见,我要做的是:
- 连接到网络套接字
- 打印来自 websocket 的传出消息
- 将来自 stdin 的任何输入转发到 websocket
- 还有一个自定义心跳解决方案,已被删减
问题在于 channel_thread() 函数。我将 websocket writer 和通道接收器都移到了这个函数中。问题是,它只会在缓冲区已满时循环发送的对象。
我花了很多时间试图解决这个问题,非常感谢任何帮助。
在这里,您在异步上下文中进行阻塞同步调用:
async fn read_line() -> String {
loop {
let mut str = String::new();
io::stdin().read_line(&mut str).unwrap();
// ^^^^^^^^^^^^^^^^^^^
// This is sync+blocking
str = str.trim().to_string();
if !str.is_empty() {
return str;
}
}
}
你永远不会曾经在异步上下文中进行阻塞同步调用,因为这会阻止整个线程运行其他异步任务。您的通道接收器任务也可能已分配给此线程,因此它必须等到所有阻塞调用完成并且调用此函数的任何内容都会返回异步运行时。
Tokio 有 its own async version of stdin
,您应该改用它。
我花了几个小时试图解决这个问题,我已经完成了。我发现了一个名字相似的问题,但看起来好像有什么东西在同步阻塞,这让 tokio 很混乱。这很可能是这里的问题,但我完全不知道是什么原因造成的。
这是我的项目的一个精简版本,希望能解决这个问题。
use std::io;
use futures_util::{
SinkExt,
stream::{SplitSink, SplitStream},
StreamExt,
};
use tokio::{
net::TcpStream,
sync::mpsc::{channel, Receiver, Sender},
};
use tokio_tungstenite::{
connect_async,
MaybeTlsStream,
tungstenite::Message,
WebSocketStream,
};
#[tokio::main]
async fn main() {
connect_to_server("wss://a_valid_domain.com".to_string()).await;
}
async fn read_line() -> String {
loop {
let mut str = String::new();
io::stdin().read_line(&mut str).unwrap();
str = str.trim().to_string();
if !str.is_empty() {
return str;
}
}
}
async fn connect_to_server(url: String) {
let (ws_stream, _) = connect_async(url).await.unwrap();
let (write, read) = ws_stream.split();
let (tx, rx) = channel::<ChannelMessage>(100);
tokio::spawn(channel_thread(write, rx));
tokio::spawn(handle_std_input(tx.clone()));
read_messages(read, tx).await;
}
#[derive(Debug)]
enum ChannelMessage {
Text(String),
Close,
}
// PROBLEMATIC FUNCTION
async fn channel_thread(
mut write: SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>,
mut rx: Receiver<ChannelMessage>,
) {
while let Some(msg) = rx.recv().await {
println!("{:?}", msg); // This only fires when buffer is full
match msg {
ChannelMessage::Text(text) => write.send(Message::Text(text)).await.unwrap(),
ChannelMessage::Close => {
write.close().await.unwrap();
rx.close();
return;
}
}
}
}
async fn read_messages(
mut read: SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>,
tx: Sender<ChannelMessage>,
) {
while let Some(msg) = read.next().await {
let msg = match msg {
Ok(m) => m,
Err(_) => continue
};
match msg {
Message::Text(m) => println!("{}", m),
Message::Close(_) => break,
_ => {}
}
}
if !tx.is_closed() {
let _ = tx.send(ChannelMessage::Close).await;
}
}
async fn handle_std_input(tx: Sender<ChannelMessage>) {
loop {
let str = read_line().await;
if tx.is_closed() {
break;
}
tx.send(ChannelMessage::Text(str)).await.unwrap();
}
}
如您所见,我要做的是:
- 连接到网络套接字
- 打印来自 websocket 的传出消息
- 将来自 stdin 的任何输入转发到 websocket
- 还有一个自定义心跳解决方案,已被删减
问题在于 channel_thread() 函数。我将 websocket writer 和通道接收器都移到了这个函数中。问题是,它只会在缓冲区已满时循环发送的对象。
我花了很多时间试图解决这个问题,非常感谢任何帮助。
在这里,您在异步上下文中进行阻塞同步调用:
async fn read_line() -> String {
loop {
let mut str = String::new();
io::stdin().read_line(&mut str).unwrap();
// ^^^^^^^^^^^^^^^^^^^
// This is sync+blocking
str = str.trim().to_string();
if !str.is_empty() {
return str;
}
}
}
你永远不会曾经在异步上下文中进行阻塞同步调用,因为这会阻止整个线程运行其他异步任务。您的通道接收器任务也可能已分配给此线程,因此它必须等到所有阻塞调用完成并且调用此函数的任何内容都会返回异步运行时。
Tokio 有 its own async version of stdin
,您应该改用它。