Rust 中新线程上的异步循环:特性 std::future::Future 没有为 `()` 实现

Async loop on a new thread in rust: the trait `std::future::Future` is not implemented for `()`

我知道这个问题已经被问过很多次了,但我仍然不知道该怎么做(更多内容在下面)。

我正在尝试使用 std::thread::spawn 生成一个新线程,然后在其中 运行 一个异步循环。

我想要的异步函数运行:

#[tokio::main] 
pub async fn pull_tweets(pg_pool2: Arc<PgPool>, config2: Arc<Settings>) {
    let mut scheduler = AsyncScheduler::new();

    scheduler.every(10.seconds()).run(move || {
        let arc_pool = pg_pool2.clone();
        let arc_config = config2.clone();
        async {
            pull_from_main(arc_pool, arc_config).await;
        }
    });

    tokio::spawn(async move {
        loop {
            scheduler.run_pending().await;
            tokio::time::sleep(Duration::from_millis(100)).await;
        }
    });
}

生成线程到 运行 in:

#[actix_web::main]
async fn main() -> std::io::Result<()> {
    let handle = thread::spawn(move || async {
        pull_tweets(pg_pool2, config2).await;
    });
}

错误:

error[E0277]: `()` is not a future
  --> src/main.rs:89:9
   |
89 |         pull_tweets(pg_pool2, config2).await;
   |         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ `()` is not a future
   |
   = help: the trait `std::future::Future` is not implemented for `()`
   = note: required by `poll`

在概括问题方面做得非常出色。似乎在某些时候需要 return 值来实现 IntoFuture 而我没有。我尝试将 Ok(()) 添加到闭包和函数中,但没有成功。

`Result<(), ()>` is not a future

然后我注意到答案中专门讲了extension functions, which I'm not using. 也讲了扩展函数

其他一些 SO 答案:

所以 none 似乎有效。有人可以帮助我理解 1) 为什么这里存在错误以及 2) 如何解决它?

注1:将std::thread::spawn替换为tokio::task::spawn_blocking即可轻松解决所有这些问题。但我有意根据 this article.

试验线程生成

注意 2:关于我想要实现的目标的更广泛背景:我在异步循环中从 Twitter 中提取 150,000 条推文。我想比较 2 个实现:主 运行time 上的 运行ning 与单独线程上的 运行ning。后者是我挣扎的地方。

注意 3:在我看来,线程和异步任务是两个不同的原语,不能混用。即生成新线程不会影响任务的行为方式,生成新任务只会在现有线程上添加工作。如果这个世界观是错误的(以及我能读到的),请告诉我。

#[tokio::main] 将您的函数转换为以下内容:

#[tokio::main] 
pub fn pull_tweets(pg_pool2: Arc<PgPool>, config2: Arc<Settings>) {
    let rt = tokio::Runtime::new();
    rt.block_on(async {
        let mut scheduler = AsyncScheduler::new();
        // ...
    });
}

请注意,它是一个 同步 函数,它生成一个新的运行时并运行内部 future 直至完成。你不需要 await 它,它是一个独立的运行时,有自己的专用线程池和调度程序:

#[actix_web::main]
async fn main() -> std::io::Result<()> {
    let handle = thread::spawn(move || {
        pull_tweets(pg_pool2, config2);
    });
}

请注意,您的原始示例在另一个方面是错误的:

#[actix_web::main]
async fn main() -> std::io::Result<()> {
    let handle = thread::spawn(move || async {
        pull_tweets(pg_pool2, config2).await;
    });
}

即使 pull_tweets 一个异步函数,线程也不会做任何事情,因为你所做的只是在 async 块中创建另一个未来.创建的 future 不会被执行,因为 futures 是 lazy(并且该线程中无论如何都没有执行程序上下文)。

我会构造代码以直接在新线程中生成运行时,并从那里调用您想要的任何 async 函数:

#[actix_web::main]
async fn main() -> std::io::Result<()> {
    let handle = thread::spawn(move || {
        let rt = tokio::runtime::Builder::new_multi_thread()
            .enable_all()
            .build()
            .unwrap();
        rt.block_on(async {
            pull_tweets(pg_pool2, config2).await;
        });
    });
}

pub async fn pull_tweets(pg_pool2: Arc<PgPool>, config2: Arc<Settings>) {
    // ...
}