异步 Rust 中的多线程 - 为什么我的代码无法并行化?

Multi-threading in async rust - why is my code failing to parallelize?

我试图通过 运行 执行以下函数来有意用尽 API 限制(900 次调用):

#[get("/exhaust")]
pub async fn exhaust(_pool: web::Data<PgPool>, config: web::Data<Arc<Settings>>) -> impl Responder {
    let mut handles = vec![];

    for i in 1..900 {
        let inner_config = config.clone();
        let handle = thread::spawn(move || async move {
            println!("running thread {}", i);
            get_single_tweet(inner_config.as_ref().deref(), "1401287393228038149")
                .await
                .unwrap();
        });
        handles.push(handle);
    }

    for h in handles {
        h.join().unwrap().await;
    }

    HttpResponse::Ok()

我的机器有 16 个内核,所以我预计上面的 运行 比单线程函数快 16 倍,但事实并非如此。事实上,它 运行 与单线程版本一样慢。

这是为什么?我错过了什么?

注意:move || async move 部分对我来说看起来有点奇怪,但我是按照编译器的建议到达那里的。由于 async closures being unstable,它不会让我将异步放在第一步的旁边。这可能是问题所在吗?

问题是您以一种导致所有工作顺序进行的方式混合了多线程和异步:您的所有线程所做的就是调用 get_single_tweet,这显然是一个 async 函数。

现在在像 Javascript 这样的语言中,get_single_tweet 会创建一个 任务 ,这会 return 一个 promise 象征着任务的实现,运行尽快

这不是 Rust 的工作方式(或者许多其他语言,顺便说一下,Python 的行为比 Javascript 更像 Rust)。在 Rust 中,get_single_tweet 只是创建了一个 future,它实际上并没有 do 任何东西,future 必须是 polled for things to happen: https://play.rust-lang.org/?version=stable&mode=debug&edition=2018&gist=b26b47e62e46b66b60844aabc2ea7be1

这次投票什么时候进行?当 await-ing 的动态链到达事件循环的顶部时。

这里的 future 是在线程中创建的,然后从线程中 returned,然后从 join 中获取时 awaited,所以你的获取不是 运行 在线程中,它们在这里 运行:

    for h in handles {
        h.join().unwrap().await;
    }

这是完全连续的。

现在要解决这个问题,您基本上有两个选择,但重要的是要保持在您的领域内:

  • 如果你想使用线程,那么线程中的调用应该阻塞
  • 否则,您可能想使用 task::spawn 而不是 thread::spawn(或您选择的 运行 中的任何等效项),确保您使用的是多线程 运行time 可能是个好主意,但在这里我假设 get_single_tweet 正在执行 IO 工作(例如从 twitter 获取内容),因此单线程 运行time 可能会产生最多已经受益

task::spawn 将(如其名称所示)创建一个 任务 和 return 它的句柄,这更接近于 Javascript的async函数。

此代码确实会 运行 您的 async 同步阻塞。 async 块创建了一个实现 Future 的类型,但需要知道的是 Futures 不会自行启动 运行ning,它们要么是await-ed 或交给 运行 的执行人。

调用 thread::spawn 并带有一个 returns 一个 Future 的闭包将不会执行它们;线程只是创建 async 块并返回。所以 async 块实际上并没有被执行,直到你 await 它们在 handles 的循环中,这将按顺序处理期货。

解决此问题的一种方法是同时使用 futures 箱子中的 join_all 到 运行 它们。

let mut futs = vec![];

for i in 1..900 {
    let inner_config = config.clone();
    futs.push(async move {
        println!("running thread {}", i);
        get_single_tweet(inner_config.as_ref().deref(), "1401287393228038149")
            .await
            .unwrap();
    });
}

futures::future::join_all(futs).await;