如何在并发上共享tokio::net::TcpStream?
How to share tokio::net::TcpStream on concurrency?
我有一个需求,在同一个TcpStream上发送和接收正常数据,同时定期发送心跳数据。在当前的实现中,Arc 用于我的目的,但它编译时出现错误。如何修复这些错误,或者是否有其他方法可以达到相同的目的?
use anyhow::Result;
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
#[tokio::main]
async fn main() -> Result<()> {
let stream = TcpStream::connect("127.0.0.1:8888").await.unwrap();
let stream = Arc::new(Mutex::new(stream));
let common_stream = stream.clone();
let handler1 = tokio::spawn(async {
loop {
let mut stream = common_stream.lock().unwrap();
let mut buf = [0u8; 10];
stream.read_exact(&mut buf).await.unwrap();
buf.reverse();
stream.write(&buf).await.unwrap();
}
});
let heartbeat_stream = stream.clone();
let handler2 = tokio::spawn(async {
loop {
let mut stream = heartbeat_stream.lock().unwrap();
stream.write_u8(1).await.unwrap();
thread::sleep(Duration::from_millis(200));
}
});
handler1.await?;
handler2.await?;
Ok(())
}
error: future cannot be sent between threads safely
--> src\main.rs:14:20
|
14 | let handler1 = tokio::spawn(async {
| ^^^^^^^^^^^^ future created by async block is not `Send`
|
= help: within `impl Future<Output = [async output]>`, the trait `Send` is not implemented for `std::sync::MutexGuard<'_, tokio::net::TcpStream>`
note: future is not `Send` as this value is used across an await
--> src\main.rs:20:31
|
16 | let mut stream = common_stream.lock().unwrap();
| ---------- has type `std::sync::MutexGuard<'_, tokio::net::TcpStream>` which is not `Send`
...
20 | stream.write(&buf).await.unwrap();
| ^^^^^^ await occurs here, with `mut stream` maybe used later
21 | }
| - `mut stream` is later dropped here
note: required by a bound in `tokio::spawn`
--> .cargo\registry\src\mirrors.tuna.tsinghua.edu.cn-df7c3c540f42cdbd\tokio-1.17.0\src\task\spawn.rs:127:21
|
127 | T: Future + Send + 'static,
| ^^^^ required by this bound in `tokio::spawn`
error: future cannot be sent between threads safely
--> src\main.rs:25:20
|
25 | let handler2 = tokio::spawn(async {
| ^^^^^^^^^^^^ future created by async block is not `Send`
|
= help: within `impl Future<Output = [async output]>`, the trait `Send` is not implemented for `std::sync::MutexGuard<'_, tokio::net::TcpStream>`
note: future is not `Send` as this value is used across an await
--> src\main.rs:28:31
|
27 | let mut stream = heartbeat_stream.lock().unwrap();
| ---------- has type `std::sync::MutexGuard<'_, tokio::net::TcpStream>` which is not `Send`
28 | stream.write_u8(1).await.unwrap();
| ^^^^^^ await occurs here, with `mut stream` maybe used later
...
31 | }
| - `mut stream` is later dropped here
note: required by a bound in `tokio::spawn`
--> .cargo\registry\src\mirrors.tuna.tsinghua.edu.cn-df7c3c540f42cdbd\tokio-1.17.0\src\task\spawn.rs:127:21
|
127 | T: Future + Send + 'static,
| ^^^^ required by this bound in `tokio::spawn`
感谢此处的任何建议。
谢谢。
你的代码中有几个错误,尽管它背后的想法几乎是好的。您应该尽可能使用任何可用的异步工具。部分 needed/desired 更改:
- 使用
tokio::time::sleep
因为是异步的,否则调用会阻塞
- 使用异步版本的互斥体(例如来自
futures
crate 的那个)
- 使用某种通用错误处理(
anyhow
会有所帮助)
use futures::lock::Mutex;
use anyhow::Error;
use tokio::time::sleep;
use std::sync::Arc;
use std::time::Duration;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
#[tokio::main]
async fn main() -> Result<(), Error> {
let stream = TcpStream::connect("127.0.0.1:8888").await.unwrap();
let stream = Arc::new(Mutex::new(stream));
let common_stream = stream.clone();
let handler1 = tokio::spawn(async move {
loop {
let mut stream = common_stream.lock().await;
let mut buf = [0u8; 10];
stream.read_exact(&mut buf).await.unwrap();
buf.reverse();
stream.write(&buf).await.unwrap();
}
});
let heartbeat_stream = stream.clone();
let handler2 = tokio::spawn(async move {
loop {
let mut stream = heartbeat_stream.lock().await;
stream.write_u8(1).await.unwrap();
sleep(Duration::from_millis(200)).await;
}
});
handler1.await?;
handler2.await?;
Ok(())
}
这里有一个解决方案,将流分成两部分进行读写加上循环执行:
- 等待心跳事件并在发生这种情况时发送一个字节来写入流的一半
- 等待读取一半(10字节)的数据,将其反转并再次写入以写入一半
此外,这不会生成线程,并且在当前没有锁的情况下可以很好地完成所有工作。
use anyhow::Result;
use std::time::Duration;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
#[tokio::main]
async fn main() -> Result<()> {
let mut stream = TcpStream::connect("127.0.0.1:8888").await?;
let (mut read, mut write) = stream.split();
let mut heartbeat_interval = tokio::time::interval(Duration::from_millis(200));
let mut buf = [0u8; 10];
loop {
tokio::select! {
_ = heartbeat_interval.tick() => {
write.write(&[1]).await?;
}
result = read.read_exact(&mut buf) => {
let _bytes_read = result?;
buf.reverse();
write.write(&buf).await?;
}
}
}
}
我有一个需求,在同一个TcpStream上发送和接收正常数据,同时定期发送心跳数据。在当前的实现中,Arc
use anyhow::Result;
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
#[tokio::main]
async fn main() -> Result<()> {
let stream = TcpStream::connect("127.0.0.1:8888").await.unwrap();
let stream = Arc::new(Mutex::new(stream));
let common_stream = stream.clone();
let handler1 = tokio::spawn(async {
loop {
let mut stream = common_stream.lock().unwrap();
let mut buf = [0u8; 10];
stream.read_exact(&mut buf).await.unwrap();
buf.reverse();
stream.write(&buf).await.unwrap();
}
});
let heartbeat_stream = stream.clone();
let handler2 = tokio::spawn(async {
loop {
let mut stream = heartbeat_stream.lock().unwrap();
stream.write_u8(1).await.unwrap();
thread::sleep(Duration::from_millis(200));
}
});
handler1.await?;
handler2.await?;
Ok(())
}
error: future cannot be sent between threads safely
--> src\main.rs:14:20
|
14 | let handler1 = tokio::spawn(async {
| ^^^^^^^^^^^^ future created by async block is not `Send`
|
= help: within `impl Future<Output = [async output]>`, the trait `Send` is not implemented for `std::sync::MutexGuard<'_, tokio::net::TcpStream>`
note: future is not `Send` as this value is used across an await
--> src\main.rs:20:31
|
16 | let mut stream = common_stream.lock().unwrap();
| ---------- has type `std::sync::MutexGuard<'_, tokio::net::TcpStream>` which is not `Send`
...
20 | stream.write(&buf).await.unwrap();
| ^^^^^^ await occurs here, with `mut stream` maybe used later
21 | }
| - `mut stream` is later dropped here
note: required by a bound in `tokio::spawn`
--> .cargo\registry\src\mirrors.tuna.tsinghua.edu.cn-df7c3c540f42cdbd\tokio-1.17.0\src\task\spawn.rs:127:21
|
127 | T: Future + Send + 'static,
| ^^^^ required by this bound in `tokio::spawn`
error: future cannot be sent between threads safely
--> src\main.rs:25:20
|
25 | let handler2 = tokio::spawn(async {
| ^^^^^^^^^^^^ future created by async block is not `Send`
|
= help: within `impl Future<Output = [async output]>`, the trait `Send` is not implemented for `std::sync::MutexGuard<'_, tokio::net::TcpStream>`
note: future is not `Send` as this value is used across an await
--> src\main.rs:28:31
|
27 | let mut stream = heartbeat_stream.lock().unwrap();
| ---------- has type `std::sync::MutexGuard<'_, tokio::net::TcpStream>` which is not `Send`
28 | stream.write_u8(1).await.unwrap();
| ^^^^^^ await occurs here, with `mut stream` maybe used later
...
31 | }
| - `mut stream` is later dropped here
note: required by a bound in `tokio::spawn`
--> .cargo\registry\src\mirrors.tuna.tsinghua.edu.cn-df7c3c540f42cdbd\tokio-1.17.0\src\task\spawn.rs:127:21
|
127 | T: Future + Send + 'static,
| ^^^^ required by this bound in `tokio::spawn`
感谢此处的任何建议。
谢谢。
你的代码中有几个错误,尽管它背后的想法几乎是好的。您应该尽可能使用任何可用的异步工具。部分 needed/desired 更改:
- 使用
tokio::time::sleep
因为是异步的,否则调用会阻塞 - 使用异步版本的互斥体(例如来自
futures
crate 的那个) - 使用某种通用错误处理(
anyhow
会有所帮助)
use futures::lock::Mutex;
use anyhow::Error;
use tokio::time::sleep;
use std::sync::Arc;
use std::time::Duration;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
#[tokio::main]
async fn main() -> Result<(), Error> {
let stream = TcpStream::connect("127.0.0.1:8888").await.unwrap();
let stream = Arc::new(Mutex::new(stream));
let common_stream = stream.clone();
let handler1 = tokio::spawn(async move {
loop {
let mut stream = common_stream.lock().await;
let mut buf = [0u8; 10];
stream.read_exact(&mut buf).await.unwrap();
buf.reverse();
stream.write(&buf).await.unwrap();
}
});
let heartbeat_stream = stream.clone();
let handler2 = tokio::spawn(async move {
loop {
let mut stream = heartbeat_stream.lock().await;
stream.write_u8(1).await.unwrap();
sleep(Duration::from_millis(200)).await;
}
});
handler1.await?;
handler2.await?;
Ok(())
}
这里有一个解决方案,将流分成两部分进行读写加上循环执行:
- 等待心跳事件并在发生这种情况时发送一个字节来写入流的一半
- 等待读取一半(10字节)的数据,将其反转并再次写入以写入一半
此外,这不会生成线程,并且在当前没有锁的情况下可以很好地完成所有工作。
use anyhow::Result;
use std::time::Duration;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
#[tokio::main]
async fn main() -> Result<()> {
let mut stream = TcpStream::connect("127.0.0.1:8888").await?;
let (mut read, mut write) = stream.split();
let mut heartbeat_interval = tokio::time::interval(Duration::from_millis(200));
let mut buf = [0u8; 10];
loop {
tokio::select! {
_ = heartbeat_interval.tick() => {
write.write(&[1]).await?;
}
result = read.read_exact(&mut buf) => {
let _bytes_read = result?;
buf.reverse();
write.write(&buf).await?;
}
}
}
}