我可以维护一个 TcpStreams 的 Vec 以在其中一个读取新输入时跨线程写入它们吗?
Can I maintain a Vec of TcpStreams to write to them across threads whenever one of them reads a new input?
我想进行消息广播:当其中一个客户端发送消息时,服务器将其写入每个套接字。我的主要问题是我不知道如何将 Vec
发送到线程。我不能使用 Mutex
因为这会锁定其他线程对 Vec
的访问以供读取。我无法克隆和发送,因为 TcpStream
无法克隆和发送。这是我到目前为止的尝试
use std::net::{TcpStream, TcpListener};
use std::io::prelude::*;
use std::sync::{Arc, Mutex};
use std::thread;
use std::sync::mpsc::{channel, Receiver};
use std::cell::RefCell;
type StreamSet = Arc<RefCell<Vec<TcpStream>>>;
type StreamReceiver = Arc<Mutex<Receiver<StreamSet>>>;
fn main() {
let listener = TcpListener::bind("0.0.0.0:8000").unwrap();
let mut connection_set: StreamSet = Arc::new(RefCell::new(vec![]));
let mut id = 0;
let (tx, rx) = channel();
let rx = Arc::new(Mutex::new(rx));
for stream in listener.incoming() {
let receiver = rx.clone();
let stream = stream.unwrap();
(*connection_set).borrow_mut().push(stream);
println!("A connection established with client {}", id);
thread::spawn(move || handle_connection(receiver, id));
id += 1;
tx.send(connection_set.clone()).unwrap();
}
}
fn handle_connection(rx: StreamReceiver, id: usize) {
let streams;
{
streams = *(rx.lock().unwrap().recv().unwrap()).borrow();
}
let mut connection = &streams[id];
loop {
let mut buffer = [0; 512];
if let Err(_) = connection.read(&mut buffer) {
break;
};
println!("Request: {}", String::from_utf8_lossy(&buffer[..]));
if let Err(_) = connection.write(&buffer[..]) {
break;
};
if let Err(_) = connection.flush() {
break;
};
}
}
另一个想法是为每个套接字生成一个 "controller" 线程 和 一个线程。每个线程都将拥有套接字并有一个通道将数据发送回控制器。控制器将拥有 Vec
个通道以发送到每个线程。当线程接收到数据时,您将其发送到控制器,控制器复制它并将其发送回每个工作线程。您可以将数据包装在 Arc
中以防止不必要的重复,并且您应该提供一个 ID 以避免将数据回传给原始发件人(如果需要)。
这将所有权完全转移到一个线程中,这应该可以避免您遇到的问题。
您可能还希望查看 Tokio,它应该允许您执行类似的操作,但无需在 1-1 映射中生成线程。
I can't use Mutex
because that will lock the access of other threads
您可以随时尝试不同的锁定机制,例如 RwLock
。
because TcpStream
can't be cloned
当然可以:TcpStream::try_clone
.
我想进行消息广播:当其中一个客户端发送消息时,服务器将其写入每个套接字。我的主要问题是我不知道如何将 Vec
发送到线程。我不能使用 Mutex
因为这会锁定其他线程对 Vec
的访问以供读取。我无法克隆和发送,因为 TcpStream
无法克隆和发送。这是我到目前为止的尝试
use std::net::{TcpStream, TcpListener};
use std::io::prelude::*;
use std::sync::{Arc, Mutex};
use std::thread;
use std::sync::mpsc::{channel, Receiver};
use std::cell::RefCell;
type StreamSet = Arc<RefCell<Vec<TcpStream>>>;
type StreamReceiver = Arc<Mutex<Receiver<StreamSet>>>;
fn main() {
let listener = TcpListener::bind("0.0.0.0:8000").unwrap();
let mut connection_set: StreamSet = Arc::new(RefCell::new(vec![]));
let mut id = 0;
let (tx, rx) = channel();
let rx = Arc::new(Mutex::new(rx));
for stream in listener.incoming() {
let receiver = rx.clone();
let stream = stream.unwrap();
(*connection_set).borrow_mut().push(stream);
println!("A connection established with client {}", id);
thread::spawn(move || handle_connection(receiver, id));
id += 1;
tx.send(connection_set.clone()).unwrap();
}
}
fn handle_connection(rx: StreamReceiver, id: usize) {
let streams;
{
streams = *(rx.lock().unwrap().recv().unwrap()).borrow();
}
let mut connection = &streams[id];
loop {
let mut buffer = [0; 512];
if let Err(_) = connection.read(&mut buffer) {
break;
};
println!("Request: {}", String::from_utf8_lossy(&buffer[..]));
if let Err(_) = connection.write(&buffer[..]) {
break;
};
if let Err(_) = connection.flush() {
break;
};
}
}
另一个想法是为每个套接字生成一个 "controller" 线程 和 一个线程。每个线程都将拥有套接字并有一个通道将数据发送回控制器。控制器将拥有 Vec
个通道以发送到每个线程。当线程接收到数据时,您将其发送到控制器,控制器复制它并将其发送回每个工作线程。您可以将数据包装在 Arc
中以防止不必要的重复,并且您应该提供一个 ID 以避免将数据回传给原始发件人(如果需要)。
这将所有权完全转移到一个线程中,这应该可以避免您遇到的问题。
您可能还希望查看 Tokio,它应该允许您执行类似的操作,但无需在 1-1 映射中生成线程。
I can't use
Mutex
because that will lock the access of other threads
您可以随时尝试不同的锁定机制,例如 RwLock
。
because
TcpStream
can't be cloned
当然可以:TcpStream::try_clone
.