使用 Tokio 启动多线程

Start multiple threads with Tokio

我正在尝试创建一个基本的 TCP 服务器:

  1. 服务器应该能够向所有连接的客户端广播消息流
  2. 服务器应该能够接收来自所有客户端的命令并处理它们

这是我在 main 函数中得到的:

let (server_tx, server_rx) = mpsc::unbounded();
let state = Arc::new(Mutex::new(Shared::new(server_tx)));

let addr = "127.0.0.1:6142".parse().unwrap();

let listener = TcpListener::bind(&addr).unwrap();

let server = listener.incoming().for_each(move |socket| {
    // Spawn a task to process the connection
    process(socket, state.clone());
    Ok(())
}).map_err(|err| {
    println!("accept error = {:?}", err);
});

println!("server running on localhost:6142");

let _messages = server_rx.for_each(|_| {
    // process messages here
    Ok(())
}).map_err(|err| {
    println!("message error = {:?}", err);
});

tokio::run(server);  

(playground)

我正在使用 tokio 存储库中的 chat.rs 示例作为基础。
我在传入的 tcp 消息上向 server_tx 发送数据。
我遇到的麻烦是消费它们。
我"consuming"使用server_rx.for_each(|_| {传入消息流,现在,我如何告诉tokio到运行呢?

tokio::run 接受一个未来,但我有 2 个(可能更多)。我如何将它们组合起来,使它们 运行 并行?

一起加入期货:

let messages = server_rx.for_each(|_| {
    println!("Message broadcasted");
    Ok(())
}).map_err(|err| {
    println!("accept error = {:?}", err);
});

tokio::run(server.join(messages).map(|_| ()));

需要 map() 组合器,因为 Join Item 关联类型是一个元组 ((), ()) 并且 tokio::run() 消耗需要 Future::Item 类型 ()

的未来任务