东京 local_set spawn_local 为什么不 运行?

tokio local_set spawn_local why not run?

我在学习tokiotask的时候遇到了执行LocalSet.spawn_local的问题,spawn_localfuture没有执行,[=的future 19=] 被执行。

代码

 #[tokio::test]
    async fn test_localset_spawn_local() {
        let local_set = task::LocalSet::new();

        let run_count = Arc::new(AtomicUsize::new(0));
        let run_count_0 = run_count.clone();
        let run_count_1 = run_count.clone();
        let run_count_2 = run_count.clone();
        let run_count_3 = run_count.clone();

        let f1 = async move {
            log("spawn_local task1 running");
            let _ = run_count_0.fetch_add(1, Ordering::SeqCst);
        };
        let f2 = async move {
            log("spawn_local task2 running");
            let _ = run_count_1.fetch_add(1, Ordering::SeqCst);
        };
        let f3 = async move {
            log("spawn_local task3 running");
            let _ = run_count_2.fetch_add(1, Ordering::SeqCst);
        };
        let f4 = async move {
            log("spawn_local task4 running");
            let _ = run_count_3.fetch_add(1, Ordering::SeqCst);
        };

        local_set.spawn_local(f1);

        let _ = local_set.run_until(f2).await;

        local_set.spawn_local(f3);

        let _ = local_set.run_until(f4).await;

        assert!(run_count.load(Ordering::Acquire) == 4);
    }

错误信息:


running 1 test
2022-03-19 17:23:11 ("test::test_localset_spawn_local") - spawn_local task2 running
2022-03-19 17:23:11 ("test::test_localset_spawn_local") - spawn_local task4 running
thread 'test::test_localset_spawn_local' panicked at 'assertion failed: run_count.load(Ordering::Acquire) == 4', src\main.rs:253:9
stack backtrace:
   0: std::panicking::begin_panic_handler
             at /rustc/db9d1b20bba1968c1ec1fc49616d4742c1725b4b\/library\std\src\panicking.rs:498
   1: core::panicking::panic_fmt
             at /rustc/db9d1b20bba1968c1ec1fc49616d4742c1725b4b\/library\core\src\panicking.rs:107
   2: core::panicking::panic
             at /rustc/db9d1b20bba1968c1ec1fc49616d4742c1725b4b\/library\core\src\panicking.rs:48
   3: _10_tokio::test::test_localset_spawn_local::generator[=14=]
             at .\src\main.rs:253
   4: core::future::from_generator::impl::poll<_10_tokio::test::test_localset_spawn_local::generator[=14=]>
             at /rustc/db9d1b20bba1968c1ec1fc49616d4742c1725b4b\library\core\src\future\mod.rs:80
   5: core::future::future::impl::poll<ref_mut$<core::future::from_generator::GenFuture<_10_tokio::test::test_localset_spawn_local::generator[=14=]> > >
             at /rustc/db9d1b20bba1968c1ec1fc49616d4742c1725b4b\library\core\src\future\future.rs:119
   6: tokio::runtime::basic_scheduler::impl::block_on::closure[=14=]::closure[=14=]::closure[=14=]<core::pin::Pin<ref_mut$<core::future::from_generator::GenFuture<_10_tokio::test::test_localset_spawn_local::generator[=14=]> > > >
             at E:\scoop\global\persist\rustup-msvc\.cargo\registry\src\rsproxy.cn-8f6827c7555bfaf8\tokio-1.17.0\src\runtime\basic_scheduler.rs:498
   7: tokio::coop::with_budget::closure[=14=]<enum$<core::task::poll::Poll<tuple$<> > >,tokio::runtime::basic_scheduler::impl::block_on::closure[=14=]::closure[=14=]::closure[=14=]>
             at E:\scoop\global\persist\rustup-msvc\.cargo\registry\src\rsproxy.cn-8f6827c7555bfaf8\tokio-1.17.0\src\coop.rs:102
   8: std::thread::local::LocalKey<core::cell::Cell<tokio::coop::Budget> >::try_with<core::cell::Cell<tokio::coop::Budget>,tokio::coop::with_budget::closure[=14=],enum$<core::task::poll::Poll<tuple$<> > > >
             at /rustc/db9d1b20bba1968c1ec1fc49616d4742c1725b4b\library\std\src\thread\local.rs:399
   9: std::thread::local::LocalKey<core::cell::Cell<tokio::coop::Budget> >::with<core::cell::Cell<tokio::coop::Budget>,tokio::coop::with_budget::closure[=14=],enum$<core::task::poll::Poll<tuple$<> > > >
             at /rustc/db9d1b20bba1968c1ec1fc49616d4742c1725b4b\library\std\src\thread\local.rs:375
  10: tokio::coop::with_budget
             at E:\scoop\global\persist\rustup-msvc\.cargo\registry\src\rsproxy.cn-8f6827c7555bfaf8\tokio-1.17.0\src\coop.rs:95
  11: tokio::coop::budget
             at E:\scoop\global\persist\rustup-msvc\.cargo\registry\src\rsproxy.cn-8f6827c7555bfaf8\tokio-1.17.0\src\coop.rs:72
  12: tokio::runtime::basic_scheduler::impl::block_on::closure[=14=]::closure[=14=]<core::pin::Pin<ref_mut$<core::future::from_generator::GenFuture<_10_tokio::test::test_localset_spawn_local::generator[=14=]> > > >
             at E:\scoop\global\persist\rustup-msvc\.cargo\registry\src\rsproxy.cn-8f6827c7555bfaf8\tokio-1.17.0\src\runtime\basic_scheduler.rs:498
  13: tokio::runtime::basic_scheduler::Context::enter<enum$<core::task::poll::Poll<tuple$<> > >,tokio::runtime::basic_scheduler::impl::block_on::closure[=14=]::closure[=14=]>
             at E:\scoop\global\persist\rustup-msvc\.cargo\registry\src\rsproxy.cn-8f6827c7555bfaf8\tokio-1.17.0\src\runtime\basic_scheduler.rs:356
  14: tokio::runtime::basic_scheduler::impl::block_on::closure[=14=]<core::pin::Pin<ref_mut$<core::future::from_generator::GenFuture<_10_tokio::test::test_localset_spawn_local::generator[=14=]> > > >
             at E:\scoop\global\persist\rustup-msvc\.cargo\registry\src\rsproxy.cn-8f6827c7555bfaf8\tokio-1.17.0\src\runtime\basic_scheduler.rs:497
  15: tokio::runtime::basic_scheduler::impl::enter::closure[=14=]<tokio::runtime::basic_scheduler::impl::block_on::closure[=14=],tuple$<> >    
             at E:\scoop\global\persist\rustup-msvc\.cargo\registry\src\rsproxy.cn-8f6827c7555bfaf8\tokio-1.17.0\src\runtime\basic_scheduler.rs:555
  16: tokio::macros::scoped_tls::ScopedKey<tokio::runtime::basic_scheduler::Context>::set<tokio::runtime::basic_scheduler::Context,tokio::runtime::basic_scheduler::impl::enter::closure[=14=],tuple$<alloc::boxed::Box<tokio::runtime::basic_scheduler::Core,alloc::al
             at E:\scoop\global\persist\rustup-msvc\.cargo\registry\src\rsproxy.cn-8f6827c7555bfaf8\tokio-1.17.0\src\macros\scoped_tls.rs:61
  17: tokio::runtime::basic_scheduler::CoreGuard::enter<tokio::runtime::basic_scheduler::impl::block_on::closure[=14=],tuple$<> >
             at E:\scoop\global\persist\rustup-msvc\.cargo\registry\src\rsproxy.cn-8f6827c7555bfaf8\tokio-1.17.0\src\runtime\basic_scheduler.rs:555
  18: tokio::runtime::basic_scheduler::CoreGuard::block_on<core::pin::Pin<ref_mut$<core::future::from_generator::GenFuture<_10_tokio::test::test_localset_spawn_local::generator[=14=]> > > >
             at E:\scoop\global\persist\rustup-msvc\.cargo\registry\src\rsproxy.cn-8f6827c7555bfaf8\tokio-1.17.0\src\runtime\basic_scheduler.rs:488
  19: tokio::runtime::basic_scheduler::BasicScheduler::block_on<core::future::from_generator::GenFuture<_10_tokio::test::test_localset_spawn_local::generator[=14=]> >
             at E:\scoop\global\persist\rustup-msvc\.cargo\registry\src\rsproxy.cn-8f6827c7555bfaf8\tokio-1.17.0\src\runtime\basic_scheduler.rs:168
  20: tokio::runtime::Runtime::block_on<core::future::from_generator::GenFuture<_10_tokio::test::test_localset_spawn_local::generator[=14=]> > 
             at E:\scoop\global\persist\rustup-msvc\.cargo\registry\src\rsproxy.cn-8f6827c7555bfaf8\tokio-1.17.0\src\runtime\mod.rs:475    
  21: _10_tokio::test::test_localset_spawn_local
             at .\src\main.rs:253
  22: _10_tokio::test::test_localset_spawn_local::closure[=14=]
             at .\src\main.rs:219
  23: core::ops::function::FnOnce::call_once<_10_tokio::test::test_localset_spawn_local::closure[=14=],tuple$<> >
             at /rustc/db9d1b20bba1968c1ec1fc49616d4742c1725b4b\library\core\src\ops\function.rs:227
  24: core::ops::function::FnOnce::call_once
             at /rustc/db9d1b20bba1968c1ec1fc49616d4742c1725b4b\library\core\src\ops\function.rs:227
note: Some details are omitted, run with `RUST_BACKTRACE=full` for a verbose backtrace.
test test::test_localset_spawn_local ... FAILED

failures:

failures:
    test::test_localset_spawn_local

test result: FAILED. 0 passed; 1 failed; 0 ignored; 0 measured; 11 filtered out; finished in 0.04s

error: test failed, to rerun pass '-p _10_tokio --bin _10_tokio'
The terminal process "E:\scoop\global\apps\rustup-msvc\current\.cargo\bin\cargo.exe 'test', '--package', '_10_tokio', '--bin', '_10_tokio', '--', 'test::test_localset_spawn_local', '--exact', '--nocapture'" terminated with exit code: 101.

最小化self-contained示例:

use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use tokio::task;

async fn run(count: Arc<AtomicUsize>, name: &str) {
    println!("{} running", name);
    let _ = count.fetch_add(1, Ordering::SeqCst);
}

#[tokio::main]
async fn main() {
    let local_set = task::LocalSet::new();
    let run_count = Arc::new(AtomicUsize::new(0));

    local_set.spawn_local(run(run_count.clone(), "spawn_local"));
    local_set
        .run_until(run(run_count.clone(), "run_until"))
        .await;

    assert_eq!(run_count.load(Ordering::Acquire), 2);
}

Playground

这个错误的原因是生成的期货(原始代码中的 async 块,示例中的 run() )没有等待任何东西,即它们立即 return Poll::Ready。因此,当您 run_until 他们时,您不会将控制权交给执行者 - run_until().await 会立即成功。另一方面,spawn_local 不会立即启动 - 它会等待其他某个未来的 await;所以它没有机会 运行 在你点击 assert 并失败之前。

通过在计数器更新后添加 sleep,以便执行程序能够 运行 完成两个任务,断言失败消失:

async fn run(count: Arc<AtomicUsize>, name: &str) {
    println!("{} running", name);
    let _ = count.fetch_add(1, Ordering::SeqCst);
    // This yields to the executor, allowing another task to be polled
    tokio::time::sleep(Duration::from_millis(1)).await;
}

Updated playground