为什么`tokio::main`报错"cycle detected when processing"?

Why does `tokio::main` report the error "cycle detected when processing"?

我正在使用 Tokio 和 async/.await 创建一个 UDP 服务器,我可以在其中以异步方式接收和发送数据。

我的 UDP 套接字的 SendHalf 被多个任务共享。为此,我使用 Arc<Mutex<SendHalf>>。这就是 Arc<Mutex<_>> 存在的原因。

use tokio::net::UdpSocket;
use tokio::net::udp::SendHalf;
use tokio::sync::mpsc;
use std::sync::{Arc, Mutex};
use std::net::SocketAddr;

struct Packet {
    sender: Arc<Mutex<SendHalf>>,
    buf: [u8; 512],
    addr: SocketAddr,
}

#[tokio::main]
async fn main() {
    let server = UdpSocket::bind(("0.0.0.0", 44667)).await.unwrap();
    let (mut server_rx, mut server_tx) = server.split();
    let sender = Arc::new(Mutex::new(server_tx));
    let (mut tx, mut rx) = mpsc::channel(100);

    tokio::spawn(async move {
        loop {
            let mut buffer = [0; 512];
            let (_, src) = server_rx.recv_from(&mut buffer).await.unwrap();
            let packet = Packet {
                sender: sender.clone(),
                buf: buffer,
                addr: src,
            };
            tx.send(packet).await;
        }
    });

    while let Some(packet) = rx.recv().await {
        tokio::spawn(async move {
            let mut socket = packet.sender.lock().unwrap();
            socket.send_to(&packet.buf, &packet.addr).await.unwrap();
        });
    }
}

这里还有一个Playground.

我遇到了一个我不理解的编译器错误:

error[E0391]: cycle detected when processing `main`
  --> src/main.rs:13:1
   |
13 | #[tokio::main]
   | ^^^^^^^^^^^^^^
   |
note: ...which requires processing `main::{{closure}}#0::{{closure}}#1`...
  --> src/main.rs:34:33
   |
34 |           tokio::spawn(async move {
   |  _________________________________^
35 | |             let mut socket = packet.sender.lock().unwrap();
36 | |             socket.send_to(&packet.buf, &packet.addr).await.unwrap();
37 | |         });
   | |_________^
   = note: ...which again requires processing `main`, completing the cycle
note: cycle used when processing `main::{{closure}}#0`
  --> src/main.rs:13:1
   |
13 | #[tokio::main]
   | ^^^^^^^^^^^^^^

为什么我的代码会产生一个循环?为什么调用需要处理 main

错误的详细含义是什么?我想了解发生了什么。

根据 tokio 文档,当谈到 using !Send value from a task 时:

Holding on to a !Send value across calls to .await will result in an unfriendly compile error message similar to:

[... some type ...] cannot be sent between threads safely

or:

error[E0391]: cycle detected when processing main

您遇到的正是这个错误。当你 lock a Mutex:

pub fn lock(&self) -> LockResult<MutexGuard<T>>

它returns一个MutexGuard,也就是!Send:

impl<'_, T: ?Sized> !Send for MutexGuard<'_, T>

编译正常:

#[tokio::main]
async fn main() {
    ...

    while let Some(packet) = rx.recv().await {
        let mut socket = packet.sender.lock().unwrap();
        socket.send_to(&packet.buf, &packet.addr).await.unwrap();
    }
}

我刚刚 运行 遇到了一个非常相似的问题(我使用的是 RwLockMutex,但结构几乎完全相同)。我想异步处理每个 UDP 数据包,而不是像这里的其他一些解决方案那样在读取另一个数据包之前等待处理。

我为解决问题所做的更改是使用 tokio::sync::* 同步结构,而不是 std 中的同步结构。因此,在您的情况下,将 std::sync::Mutex 切换为 tokio::sync::Mutex(并将 .unwrap() 切换为 .await),您应该没问题。 (Documentation)

我对你的 playground 进行了更改,现在正在编译: https://play.rust-lang.org/?version=stable&mode=debug&edition=2018&gist=2b6114a12a5f784a38a595bc521f6f02

(也非常感谢Tokio Slack在我遇到同样问题时为我提供了答案,只是传递一下)。