如何在并发上共享tokio::net::TcpStream?

How to share tokio::net::TcpStream on concurrency?

我有一个需求,在同一个TcpStream上发送和接收正常数据,同时定期发送心跳数据。在当前的实现中,Arc 用于我的目的,但它编译时出现错误。如何修复这些错误,或者是否有其他方法可以达到相同的目的?

use anyhow::Result;
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;

#[tokio::main]
async fn main() -> Result<()> {
    let stream = TcpStream::connect("127.0.0.1:8888").await.unwrap();
    let stream = Arc::new(Mutex::new(stream));

    let common_stream = stream.clone();
    let handler1 = tokio::spawn(async {
        loop {
            let mut stream = common_stream.lock().unwrap();
            let mut buf = [0u8; 10];
            stream.read_exact(&mut buf).await.unwrap();
            buf.reverse();
            stream.write(&buf).await.unwrap();
        }
    });

    let heartbeat_stream = stream.clone();
    let handler2 = tokio::spawn(async {
        loop {
            let mut stream = heartbeat_stream.lock().unwrap();
            stream.write_u8(1).await.unwrap();

            thread::sleep(Duration::from_millis(200));
        }
    });

    handler1.await?;
    handler2.await?;

    Ok(())
}
error: future cannot be sent between threads safely
   --> src\main.rs:14:20
    |
14  |     let handler1 = tokio::spawn(async {
    |                    ^^^^^^^^^^^^ future created by async block is not `Send`
    |
    = help: within `impl Future<Output = [async output]>`, the trait `Send` is not implemented for `std::sync::MutexGuard<'_, tokio::net::TcpStream>`
note: future is not `Send` as this value is used across an await
   --> src\main.rs:20:31
    |
16  |             let mut stream = common_stream.lock().unwrap();
    |                 ---------- has type `std::sync::MutexGuard<'_, tokio::net::TcpStream>` which is not `Send`
...
20  |             stream.write(&buf).await.unwrap();
    |                               ^^^^^^ await occurs here, with `mut stream` maybe used later
21  |         }
    |         - `mut stream` is later dropped here
note: required by a bound in `tokio::spawn`
   --> .cargo\registry\src\mirrors.tuna.tsinghua.edu.cn-df7c3c540f42cdbd\tokio-1.17.0\src\task\spawn.rs:127:21
    |
127 |         T: Future + Send + 'static,
    |                     ^^^^ required by this bound in `tokio::spawn`

error: future cannot be sent between threads safely
   --> src\main.rs:25:20
    |
25  |     let handler2 = tokio::spawn(async {
    |                    ^^^^^^^^^^^^ future created by async block is not `Send`
    |
    = help: within `impl Future<Output = [async output]>`, the trait `Send` is not implemented for `std::sync::MutexGuard<'_, tokio::net::TcpStream>`
note: future is not `Send` as this value is used across an await
   --> src\main.rs:28:31
    |
27  |             let mut stream = heartbeat_stream.lock().unwrap();
    |                 ---------- has type `std::sync::MutexGuard<'_, tokio::net::TcpStream>` which is not `Send`
28  |             stream.write_u8(1).await.unwrap();
    |                               ^^^^^^ await occurs here, with `mut stream` maybe used later
...
31  |         }
    |         - `mut stream` is later dropped here
note: required by a bound in `tokio::spawn`
   --> .cargo\registry\src\mirrors.tuna.tsinghua.edu.cn-df7c3c540f42cdbd\tokio-1.17.0\src\task\spawn.rs:127:21
    |
127 |         T: Future + Send + 'static,
    |                     ^^^^ required by this bound in `tokio::spawn`

感谢此处的任何建议。

谢谢。

你的代码中有几个错误,尽管它背后的想法几乎是好的。您应该尽可能使用任何可用的异步工具。部分 needed/desired 更改:

  • 使用tokio::time::sleep因为是异步的,否则调用会阻塞
  • 使用异步版本的互斥体(例如来自 futures crate 的那个)
  • 使用某种通用错误处理(anyhow 会有所帮助)
use futures::lock::Mutex;
use anyhow::Error;
use tokio::time::sleep;
use std::sync::Arc;
use std::time::Duration;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;

#[tokio::main]
async fn main() -> Result<(), Error> {
    let stream = TcpStream::connect("127.0.0.1:8888").await.unwrap();
    let stream = Arc::new(Mutex::new(stream));

    let common_stream = stream.clone();
    let handler1 = tokio::spawn(async move {
        loop {
            let mut stream = common_stream.lock().await;
            let mut buf = [0u8; 10];
            stream.read_exact(&mut buf).await.unwrap();
            buf.reverse();
            stream.write(&buf).await.unwrap();
        }
    });

    let heartbeat_stream = stream.clone();
    let handler2 = tokio::spawn(async move {
        loop {
            let mut stream = heartbeat_stream.lock().await;
            stream.write_u8(1).await.unwrap();

            sleep(Duration::from_millis(200)).await;
        }
    });

    handler1.await?;
    handler2.await?;

    Ok(())
}

Playground

这里有一个解决方案,将流分成两部分进行读写加上循环执行:

  • 等待心跳事件并在发生这种情况时发送一个字节来写入流的一半
  • 等待读取一半(10字节)的数据,将其反转并再次写入以写入一半

此外,这不会生成线程,并且在当前没有锁的情况下可以很好地完成所有工作。

use anyhow::Result;
use std::time::Duration;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;

#[tokio::main]
async fn main() -> Result<()> {
    let mut stream = TcpStream::connect("127.0.0.1:8888").await?;
    let (mut read, mut write) = stream.split();
    let mut heartbeat_interval = tokio::time::interval(Duration::from_millis(200));
    let mut buf = [0u8; 10];

    loop {
        tokio::select! {
            _ = heartbeat_interval.tick() => {
                write.write(&[1]).await?;
            }

            result = read.read_exact(&mut buf) => {
                let _bytes_read = result?;
                buf.reverse();
                write.write(&buf).await?;
            }
        }
    }
}