将 tokio::task::JoinHandle 存储在 HashMap 中并从另一个任务访问它
Storing tokio::task::JoinHandle in a HashMap and accessing it from another task
你好 Rustaceans,
我目前正在尝试使用 serenity-rs 将 JVM 应用程序(准确地说是一个 discord 机器人)移植到 rust 以进行娱乐和教育,并且我目前正在使用 tokio 任务和共享状态。我对整个 Rust 体验很陌生,所以请保持温柔。
基本思想是,一些任务异步启动,并在等待一段时间后将一些数据插入到共享(并发)映射中。该程序的另一部分正在监听事件,如果在第一个任务仍在等待时发生此类事件,它将取消该任务,从而导致数据无法插入到共享映射中。
我从一个“简化的”示例开始,它看起来像这样并且只依赖于 tokio。
[dependencies]
tokio = { version = "1", features = ["full"] }
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use tokio::join;
use tokio::sync::RwLock;
use tokio::time;
#[tokio::main]
async fn main() {
let mut data: Arc<RwLock<HashMap<_,_>>> =
Arc::new(RwLock::new(HashMap::new()));
let mut tasks: Arc<RwLock<HashMap<String, _>>> =
Arc::new(RwLock::new(HashMap::new()));
let mut d1 = data.clone();
let handle_a = tokio::spawn(async move {
time::sleep(Duration::from_secs(30)).await;
{
let mut lock = d1.write().await;
lock.insert("foo", "bar");
}
});
let handle_a = Arc::new(handle_a);
{
let mut map = tasks.write().await;
map.insert("the_task".to_string(), handle_a.clone());
}
let mut d2 = data.clone();
let handle_b = tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(10)).await;
{
let d = d2.read().await;
let res = d.get("foo");
println!("After 10sec: {}", res.unwrap_or(&"None"));
let mut m = tasks.write().await;
{
let t = m.get("the_task").unwrap();
println!("Now cancelling: {:?}", t);
t.abort();
let _ = m.remove("the_task");
}
}
tokio::time::sleep(Duration::from_secs(25)).await;
{
let d = d2.read().await;
let res = d.get("foo").unwrap_or(&"None");
println!("After 35sec: {}", res)
}
});
join!(handle_a, handle_b);
}
当前编译时出现以下错误:
55 | join!(handle_a, handle_b);
| ^^^^^^^^ `Arc<tokio::task::JoinHandle<()>>` is not a future
我将 handle_a
包装在 Arc
中,因为我想在“主函数结束时”等待它,同时仍然能够将对它的引用放入任务映射中。所以这显然似乎是错误的,但我想不出另一种方式来处理这个问题。简单地在 handle_a
上调用 deref()
会给出不同的错误:
`Future` is implemented for `&mut tokio::task::JoinHandle<()>`, but not for `&tokio::task::JoinHandle<()>
我想这是有道理的,因为 Arc
状态的文档:
Shared references in Rust disallow mutation by default, and Arc is no
exception: you cannot generally obtain a mutable reference to
something inside an Arc.
我认为 Rust 中 futures 的“基于拉动”的方法才是真正让我在这里挣扎的原因,因为我需要在 main 函数结束时引用 JoinHandle 来等待它。
可能这是解决这个问题的完全错误的方法,所以我非常感谢任何正确方向的提示或推动。
已经感谢您的宝贵时间,如果您读到这里!
编辑:修复了已接受答案中提到的拼写错误。
tokio::spawn
已经在后台执行未来。如果你想等待它的结果,你只需要等待它,在这种情况下你不需要。
您可以简单地将其替换为 b.await
.
您在任务名称中也有一个小逻辑错误 - 您在一个查询中使用 "thetask"
而不是 "the_task"
。
我已经稍微清理了代码,您可以在 playground.
上找到工作版本
我不熟悉 Discord 机器人的设计,但如果你只有一个一直 运行ning 的“事件循环”,负责创建和取消任务,你就不会不需要使用 tokio::spawn
为它创建一个任务,但可以直接在 main
函数中 运行 它。这意味着它可以独占 tasks
地图,您不会 运行 陷入共享所有权的问题。
你好 Rustaceans,
我目前正在尝试使用 serenity-rs 将 JVM 应用程序(准确地说是一个 discord 机器人)移植到 rust 以进行娱乐和教育,并且我目前正在使用 tokio 任务和共享状态。我对整个 Rust 体验很陌生,所以请保持温柔。
基本思想是,一些任务异步启动,并在等待一段时间后将一些数据插入到共享(并发)映射中。该程序的另一部分正在监听事件,如果在第一个任务仍在等待时发生此类事件,它将取消该任务,从而导致数据无法插入到共享映射中。
我从一个“简化的”示例开始,它看起来像这样并且只依赖于 tokio。
[dependencies]
tokio = { version = "1", features = ["full"] }
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use tokio::join;
use tokio::sync::RwLock;
use tokio::time;
#[tokio::main]
async fn main() {
let mut data: Arc<RwLock<HashMap<_,_>>> =
Arc::new(RwLock::new(HashMap::new()));
let mut tasks: Arc<RwLock<HashMap<String, _>>> =
Arc::new(RwLock::new(HashMap::new()));
let mut d1 = data.clone();
let handle_a = tokio::spawn(async move {
time::sleep(Duration::from_secs(30)).await;
{
let mut lock = d1.write().await;
lock.insert("foo", "bar");
}
});
let handle_a = Arc::new(handle_a);
{
let mut map = tasks.write().await;
map.insert("the_task".to_string(), handle_a.clone());
}
let mut d2 = data.clone();
let handle_b = tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(10)).await;
{
let d = d2.read().await;
let res = d.get("foo");
println!("After 10sec: {}", res.unwrap_or(&"None"));
let mut m = tasks.write().await;
{
let t = m.get("the_task").unwrap();
println!("Now cancelling: {:?}", t);
t.abort();
let _ = m.remove("the_task");
}
}
tokio::time::sleep(Duration::from_secs(25)).await;
{
let d = d2.read().await;
let res = d.get("foo").unwrap_or(&"None");
println!("After 35sec: {}", res)
}
});
join!(handle_a, handle_b);
}
当前编译时出现以下错误:
55 | join!(handle_a, handle_b);
| ^^^^^^^^ `Arc<tokio::task::JoinHandle<()>>` is not a future
我将 handle_a
包装在 Arc
中,因为我想在“主函数结束时”等待它,同时仍然能够将对它的引用放入任务映射中。所以这显然似乎是错误的,但我想不出另一种方式来处理这个问题。简单地在 handle_a
上调用 deref()
会给出不同的错误:
`Future` is implemented for `&mut tokio::task::JoinHandle<()>`, but not for `&tokio::task::JoinHandle<()>
我想这是有道理的,因为 Arc
状态的文档:
Shared references in Rust disallow mutation by default, and Arc is no exception: you cannot generally obtain a mutable reference to something inside an Arc.
我认为 Rust 中 futures 的“基于拉动”的方法才是真正让我在这里挣扎的原因,因为我需要在 main 函数结束时引用 JoinHandle 来等待它。
可能这是解决这个问题的完全错误的方法,所以我非常感谢任何正确方向的提示或推动。
已经感谢您的宝贵时间,如果您读到这里!
编辑:修复了已接受答案中提到的拼写错误。
tokio::spawn
已经在后台执行未来。如果你想等待它的结果,你只需要等待它,在这种情况下你不需要。
您可以简单地将其替换为 b.await
.
您在任务名称中也有一个小逻辑错误 - 您在一个查询中使用 "thetask"
而不是 "the_task"
。
我已经稍微清理了代码,您可以在 playground.
我不熟悉 Discord 机器人的设计,但如果你只有一个一直 运行ning 的“事件循环”,负责创建和取消任务,你就不会不需要使用 tokio::spawn
为它创建一个任务,但可以直接在 main
函数中 运行 它。这意味着它可以独占 tasks
地图,您不会 运行 陷入共享所有权的问题。