混select!和 Rust 中的异步调用

Mixing select! and async calls in Rust

我正在构建一个小型应用程序,它应该以不同的时间间隔安排两项任务(基于 rusoto AWS SDK):每 X 秒,运行 一个任务,以及每 Y 秒,运行另一个.

我找到了 crate crossbeam,它提供了一个滴答计时器和一个 select! 宏,并将它们放在一起,如下所示:

fn main() -> Result<(), Error> {
  let cloudwatch_client = rusoto_cloudwatch::CloudWatchClient::new();
  let rt = Runtime::new().unwrap();
  let tick_a = tick(Duration::from_secs(60));
  let tick_b = tick(Duration::from_secs(30));

  loop {
    select! {
      recv(tick_a) -> _ => {
        rt.block_on(cloudwatch_client.put_metric_data( /* ... */ ));
      },
      /* similar for tick_b */
    }
  }
}

这可以编译,但是程序因 thread 'main' panicked at 'not currently running on the Tokio runtime.' 而出现混乱。通过分析回溯,这似乎来自 Rusoto 调用。

我在这里错过了什么?有没有办法使这项工作?有没有更好的方法来处理 Rust 中间隔时间的任务调度?

请注意, 问题似乎没有解决我的问题。该问题从使用 futures::executor::block_on 函数开始,并通过使用 tokio Runtime 实现的 block_on 方法解决。我已经在 Runtime.

中使用 block_on 方法

CloudWatchClient::put_metric_data returns a RusotoFuture which has a sync 方法可以满足您的需求。所以代替:

let cloudwatch_client = rusoto_cloudwatch::CloudWatchClient::new();
let rt = Runtime::new().unwrap();
rt.block_on(cloudwatch_client.put_metric_data(/* ... */));

你可以这样做:

let cloudwatch_client = rusoto_cloudwatch::CloudWatchClient::new();
cloudwatch_client.put_metric_data(/* ... */).sync();

Which is also what the official docs recommend.

thread 'main' panicked at 'not currently running on the Tokio runtime.'

如果特定库所需的 tokio 运行time 版本未主动 运行ning,则会出现此错误 - 因为每个主要版本都使用不同的线程局部变量并且同一个构建中可以包含一个库的 1 个以上主要版本。

在你的情况下,你可能有 Tokio 0.3 运行time 运行ning,但 rusoto 期望 Tokio 0.2 运行time。当 rusoto 然后尝试通过 Tokio 0.2(也包含在构建中)执行 IO 时,那个检测到没有 运行time 处于活动状态并产生错误。

要解决此问题,请确保在您的项目中只使用一个 tokio 版本。您可能需要通过 Cargo.toml 将 tokio 降级到 0.2,因为可能没有更新的 rusoto 版本可用。

还有一个不相关的建议:

您还可以 运行 tokio 运行time 中的“整体”,而不是使用横梁作为计时器:您可以使用 tokio::select!和 tokio 计时器,用于在此处执行您对横梁所做的操作。

https://docs.rs/tokio/0.2.24/tokio/time/fn.interval.htmlhttps://docs.rs/tokio/0.2.24/tokio/macro.select.html(有与您的用例相似的示例)

请注意:

Note that from v0.43.0 onward, Rusoto uses Rust's std::future::Future, and the Tokio 0.2 ecosystem. From v0.46.0 onward, Rusoto uses the Tokio 1.0 ecosystem.

对于简单的 S3 调用,我建议使用以下依赖项:

[dependencies]  
rusoto_core = "0.46.0"                            
rusoto_s3 = "0.46.0"                                                    
tokio = {version = "1.0",  features = ["full"]}  

调用可以这样实现:

use rusoto_core::Region;
use rusoto_s3::{S3, S3Client};

#[tokio::main]
async fn main() {
    let region = Region::UsEast1;
    let client = S3Client::new(region);

    match client.list_buckets().await {
        Ok(output) => match output.buckets {
            Some(bucket_list) => {
                println!("Buckets in S3:");

                for bucket in bucket_list {
                    println!("{:?}", bucket.name);
                }
            }
            None => println!("No buckets!"),
        },
        Err(error) => {
            println!("Error: {:?}", error);
        }
    }
}

设置凭据以避免凭据错误。