将 tokio::task::JoinHandle 存储在 HashMap 中并从另一个任务访问它

Storing tokio::task::JoinHandle in a HashMap and accessing it from another task

你好 Rustaceans,

我目前正在尝试使用 serenity-rs 将 JVM 应用程序(准确地说是一个 discord 机器人)移植到 rust 以进行娱乐和教育,并且我目前正在使用 tokio 任务和共享状态。我对整个 Rust 体验很陌生,所以请保持温柔。

基本思想是,一些任务异步启动,并在等待一段时间后将一些数据插入到共享(并发)映射中。该程序的另一部分正在监听事件,如果在第一个任务仍在等待时发生此类事件,它将取消该任务,从而导致数据无法插入到共享映射中。

我从一个“简化的”示例开始,它看起来像这样并且只依赖于 tokio。

[dependencies]
tokio = { version = "1", features = ["full"] }
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;

use tokio::join;
use tokio::sync::RwLock;
use tokio::time;

#[tokio::main]
async fn main() {
    let mut data: Arc<RwLock<HashMap<_,_>>> =
        Arc::new(RwLock::new(HashMap::new()));

    let mut tasks: Arc<RwLock<HashMap<String, _>>> =
        Arc::new(RwLock::new(HashMap::new()));

    let mut d1 = data.clone();
    let handle_a = tokio::spawn(async move {
        time::sleep(Duration::from_secs(30)).await;
        {
            let mut lock = d1.write().await;
            lock.insert("foo", "bar");
        }
    });

    let handle_a = Arc::new(handle_a);
    {
        let mut map = tasks.write().await;
        map.insert("the_task".to_string(), handle_a.clone());
    }

    let mut d2 = data.clone();
    let handle_b = tokio::spawn(async move {
        tokio::time::sleep(Duration::from_secs(10)).await;
        {
            let d = d2.read().await;
            let res = d.get("foo");
            println!("After 10sec: {}", res.unwrap_or(&"None"));
            let mut m = tasks.write().await;
            {
                let t = m.get("the_task").unwrap();
                println!("Now cancelling: {:?}", t);
                t.abort();
                let _ = m.remove("the_task");
            }
        }
        tokio::time::sleep(Duration::from_secs(25)).await;
        {
            let d = d2.read().await;
            let res = d.get("foo").unwrap_or(&"None");
            println!("After 35sec: {}", res)
        }
    });

    join!(handle_a, handle_b);
}

当前编译时出现以下错误:

55 |     join!(handle_a, handle_b);
   |           ^^^^^^^^ `Arc<tokio::task::JoinHandle<()>>` is not a future

我将 handle_a 包装在 Arc 中,因为我想在“主函数结束时”等待它,同时仍然能够将对它的引用放入任务映射中。所以这显然似乎是错误的,但我想不出另一种方式来处理这个问题。简单地在 handle_a 上调用 deref() 会给出不同的错误:

`Future` is implemented for `&mut tokio::task::JoinHandle<()>`, but not for `&tokio::task::JoinHandle<()>

我想这是有道理的,因为 Arc 状态的文档:

Shared references in Rust disallow mutation by default, and Arc is no exception: you cannot generally obtain a mutable reference to something inside an Arc.

我认为 Rust 中 futures 的“基于拉动”的方法才是真正让我在这里挣扎的原因,因为我需要在 main 函数结束时引用 JoinHandle 来等待它。

可能这是解决这个问题的完全错误的方法,所以我非常感谢任何正确方向的提示或推动。

已经感谢您的宝贵时间,如果您读到这里!

编辑:修复了已接受答案中提到的拼写错误。

tokio::spawn 已经在后台执行未来。如果你想等待它的结果,你只需要等待它,在这种情况下你不需要。 您可以简单地将其替换为 b.await.

您在任务名称中也有一个小逻辑错误 - 您在一个查询中使用 "thetask" 而不是 "the_task"。 我已经稍微清理了代码,您可以在 playground.

上找到工作版本

我不熟悉 Discord 机器人的设计,但如果你只有一个一直 运行ning 的“事件循环”,负责创建和取消任务,你就不会不需要使用 tokio::spawn 为它创建一个任务,但可以直接在 main 函数中 运行 它。这意味着它可以独占 tasks 地图,您不会 运行 陷入共享所有权的问题。