异步 Rust 中的多线程 - 为什么我的代码无法并行化?
Multi-threading in async rust - why is my code failing to parallelize?
我试图通过 运行 执行以下函数来有意用尽 API 限制(900 次调用):
#[get("/exhaust")]
pub async fn exhaust(_pool: web::Data<PgPool>, config: web::Data<Arc<Settings>>) -> impl Responder {
let mut handles = vec![];
for i in 1..900 {
let inner_config = config.clone();
let handle = thread::spawn(move || async move {
println!("running thread {}", i);
get_single_tweet(inner_config.as_ref().deref(), "1401287393228038149")
.await
.unwrap();
});
handles.push(handle);
}
for h in handles {
h.join().unwrap().await;
}
HttpResponse::Ok()
我的机器有 16 个内核,所以我预计上面的 运行 比单线程函数快 16 倍,但事实并非如此。事实上,它 运行 与单线程版本一样慢。
这是为什么?我错过了什么?
注意:move || async move
部分对我来说看起来有点奇怪,但我是按照编译器的建议到达那里的。由于 async closures being unstable
,它不会让我将异步放在第一步的旁边。这可能是问题所在吗?
问题是您以一种导致所有工作顺序进行的方式混合了多线程和异步:您的所有线程所做的就是调用 get_single_tweet
,这显然是一个 async
函数。
现在在像 Javascript 这样的语言中,get_single_tweet
会创建一个 任务 ,这会 return 一个 promise 象征着任务的实现,运行尽快
这不是 Rust 的工作方式(或者许多其他语言,顺便说一下,Python 的行为比 Javascript 更像 Rust)。在 Rust 中,get_single_tweet
只是创建了一个 future,它实际上并没有 do 任何东西,future 必须是 polled for things to happen: https://play.rust-lang.org/?version=stable&mode=debug&edition=2018&gist=b26b47e62e46b66b60844aabc2ea7be1
这次投票什么时候进行?当 await-ing 的动态链到达事件循环的顶部时。
这里的 future 是在线程中创建的,然后从线程中 returned,然后从 join
中获取时 await
ed,所以你的获取不是 运行 在线程中,它们在这里 运行:
for h in handles {
h.join().unwrap().await;
}
这是完全连续的。
现在要解决这个问题,您基本上有两个选择,但重要的是要保持在您的领域内:
- 如果你想使用线程,那么线程中的调用应该阻塞
- 否则,您可能想使用
task::spawn
而不是 thread::spawn
(或您选择的 运行 中的任何等效项),确保您使用的是多线程 运行time 可能是个好主意,但在这里我假设 get_single_tweet
正在执行 IO 工作(例如从 twitter 获取内容),因此单线程 运行time 可能会产生最多已经受益
task::spawn
将(如其名称所示)创建一个 任务 和 return 它的句柄,这更接近于 Javascript的async
函数。
此代码确实会 运行 您的 async
同步阻塞。 async
块创建了一个实现 Future
的类型,但需要知道的是 Future
s 不会自行启动 运行ning,它们要么是await
-ed 或交给 运行 的执行人。
调用 thread::spawn
并带有一个 returns 一个 Future
的闭包将不会执行它们;线程只是创建 async
块并返回。所以 async
块实际上并没有被执行,直到你 await
它们在 handles
的循环中,这将按顺序处理期货。
解决此问题的一种方法是同时使用 futures
箱子中的 join_all
到 运行 它们。
let mut futs = vec![];
for i in 1..900 {
let inner_config = config.clone();
futs.push(async move {
println!("running thread {}", i);
get_single_tweet(inner_config.as_ref().deref(), "1401287393228038149")
.await
.unwrap();
});
}
futures::future::join_all(futs).await;
我试图通过 运行 执行以下函数来有意用尽 API 限制(900 次调用):
#[get("/exhaust")]
pub async fn exhaust(_pool: web::Data<PgPool>, config: web::Data<Arc<Settings>>) -> impl Responder {
let mut handles = vec![];
for i in 1..900 {
let inner_config = config.clone();
let handle = thread::spawn(move || async move {
println!("running thread {}", i);
get_single_tweet(inner_config.as_ref().deref(), "1401287393228038149")
.await
.unwrap();
});
handles.push(handle);
}
for h in handles {
h.join().unwrap().await;
}
HttpResponse::Ok()
我的机器有 16 个内核,所以我预计上面的 运行 比单线程函数快 16 倍,但事实并非如此。事实上,它 运行 与单线程版本一样慢。
这是为什么?我错过了什么?
注意:move || async move
部分对我来说看起来有点奇怪,但我是按照编译器的建议到达那里的。由于 async closures being unstable
,它不会让我将异步放在第一步的旁边。这可能是问题所在吗?
问题是您以一种导致所有工作顺序进行的方式混合了多线程和异步:您的所有线程所做的就是调用 get_single_tweet
,这显然是一个 async
函数。
现在在像 Javascript 这样的语言中,get_single_tweet
会创建一个 任务 ,这会 return 一个 promise 象征着任务的实现,运行尽快
这不是 Rust 的工作方式(或者许多其他语言,顺便说一下,Python 的行为比 Javascript 更像 Rust)。在 Rust 中,get_single_tweet
只是创建了一个 future,它实际上并没有 do 任何东西,future 必须是 polled for things to happen: https://play.rust-lang.org/?version=stable&mode=debug&edition=2018&gist=b26b47e62e46b66b60844aabc2ea7be1
这次投票什么时候进行?当 await-ing 的动态链到达事件循环的顶部时。
这里的 future 是在线程中创建的,然后从线程中 returned,然后从 join
中获取时 await
ed,所以你的获取不是 运行 在线程中,它们在这里 运行:
for h in handles {
h.join().unwrap().await;
}
这是完全连续的。
现在要解决这个问题,您基本上有两个选择,但重要的是要保持在您的领域内:
- 如果你想使用线程,那么线程中的调用应该阻塞
- 否则,您可能想使用
task::spawn
而不是thread::spawn
(或您选择的 运行 中的任何等效项),确保您使用的是多线程 运行time 可能是个好主意,但在这里我假设get_single_tweet
正在执行 IO 工作(例如从 twitter 获取内容),因此单线程 运行time 可能会产生最多已经受益
task::spawn
将(如其名称所示)创建一个 任务 和 return 它的句柄,这更接近于 Javascript的async
函数。
此代码确实会 运行 您的 async
同步阻塞。 async
块创建了一个实现 Future
的类型,但需要知道的是 Future
s 不会自行启动 运行ning,它们要么是await
-ed 或交给 运行 的执行人。
调用 thread::spawn
并带有一个 returns 一个 Future
的闭包将不会执行它们;线程只是创建 async
块并返回。所以 async
块实际上并没有被执行,直到你 await
它们在 handles
的循环中,这将按顺序处理期货。
解决此问题的一种方法是同时使用 futures
箱子中的 join_all
到 运行 它们。
let mut futs = vec![];
for i in 1..900 {
let inner_config = config.clone();
futs.push(async move {
println!("running thread {}", i);
get_single_tweet(inner_config.as_ref().deref(), "1401287393228038149")
.await
.unwrap();
});
}
futures::future::join_all(futs).await;