如何在异步 tokio 运行时中将 future::join_all 与多路复用的 redis 一起使用

How to use future::join_all with the multiplexed redis in the async tokio runtime

我正在尝试使用 Rust redis client in the asynchronous multiplexed mode, with tokio 作为异步运行时,以及要加入的动态数量的期货。

我在一定数量的期货上使用 future::join3 成功,但我想多路复用更多命令(在编译时不必知道具体大小,但即使那样也是改进)。

这是使用future::join3时的工作示例;该示例正确打印 Ok(Some("PONG")) Ok(Some("PONG")) Ok(Some("PONG"))

Cargo.toml

[package]
name = "redis_sample"
version = "0.1.0"
authors = ["---"]
edition = "2018"


[dependencies]
redis = { version = "0.17.0", features = ["aio", "tokio-comp", "tokio-rt-core"] }
tokio = { version = "0.2.23", features = ["full"] }
futures = "0.3.8"

src/main.rs

use futures::future;
use redis::RedisResult;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let redis_client = redis::Client::open("redis://127.0.0.1:6379")?;
    let mut redis_connection = redis_client.get_multiplexed_tokio_connection().await?;

    let results: (RedisResult<Option<String>>, RedisResult<Option<String>>, RedisResult<Option<String>>) = future::join3(
        redis::cmd("PING").query_async(&mut redis_connection.clone()),
        redis::cmd("PING").query_async(&mut redis_connection.clone()),
        redis::cmd("PING").query_async(&mut redis_connection),
    ).await;

    println!("{:?} {:?} {:?}", results.0, results.1, results.2);

    Ok(())
}

现在我想做同样的事情,但是使用 n 命令(比如说 10,但理想情况下我想将其调整为生产性能)。这是我所能得到的,但我无法克服借用规则;我尝试在 Vec 中存储一些中介(redis Cmd 或 future 本身)以延长它们的寿命,但这还有其他问题(有多个 mut 引用)。

Cargo.toml相同;这是 main.rs

use futures::{future, Future};
use std::pin::Pin;
use redis::RedisResult;

const BATCH_SIZE: usize = 10;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let redis_client = redis::Client::open("redis://127.0.0.1:6379")?;
    let redis_connection = redis_client.get_multiplexed_tokio_connection().await?;

    let mut commands: Vec<Pin<Box<dyn Future<Output = RedisResult<Option<String>>>>>> = vec![];
    for _ in 0..BATCH_SIZE {
        commands.push(Box::pin(redis::cmd("PING").query_async(& mut redis_connection.clone())));
    }
    let results = future::join_all(commands).await;

    println!("{:?}", results);

    Ok(())
}

我收到两个编译器警告 (creates a temporary which is freed while still in use),而且我不知道如何继续使用此代码。我不是 100% 迷上了使用 Pin,但没有它我什至无法存储期货。

完整的编译器输出:

   Compiling redis_sample v0.1.0 (/Users/gyfis/Documents/programming/rust/redis_sample)
error[E0716]: temporary value dropped while borrowed
  --> redis_sample/src/main.rs:14:32
   |
14 |         commands.push(Box::pin(redis::cmd("PING").query_async(& mut redis_connection.clone())));
   |                                ^^^^^^^^^^^^^^^^^^                                              - temporary value is freed at the end of this statement
   |                                |
   |                                creates a temporary which is freed while still in use
...
21 | }
   | - borrow might be used here, when `commands` is dropped and runs the `Drop` code for type `std::vec::Vec`
   |
   = note: consider using a `let` binding to create a longer lived value

error[E0716]: temporary value dropped while borrowed
  --> redis_sample/src/main.rs:14:69
   |
14 |         commands.push(Box::pin(redis::cmd("PING").query_async(& mut redis_connection.clone())));
   |                                                                     ^^^^^^^^^^^^^^^^^^^^^^^^   - temporary value is freed at the end of this statement
   |                                                                     |
   |                                                                     creates a temporary which is freed while still in use
...
21 | }
   | - borrow might be used here, when `commands` is dropped and runs the `Drop` code for type `std::vec::Vec`
   |
   = note: consider using a `let` binding to create a longer lived value

error: aborting due to 2 previous errors

For more information about this error, try `rustc --explain E0716`.
error: could not compile `redis_sample`.

感谢任何帮助!

这应该可以,我只是延长了 redis_connection 的生命周期。

use futures::{future, Future};
use std::pin::Pin;
use redis::RedisResult;

const BATCH_SIZE: usize = 10;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let redis_client = redis::Client::open("redis://127.0.0.1:6379")?;
    let redis_connection = redis_client.get_multiplexed_tokio_connection().await?;

    let mut commands: Vec<Pin<Box<dyn Future<Output = RedisResult<Option<String>>>>>> = vec![];
    for _ in 0..BATCH_SIZE {
        let mut redis_connection = redis_connection.clone();
        commands.push(Box::pin(async move {
            redis::cmd("PING").query_async(&mut redis_connection).await
        }));
    }
    let results = future::join_all(commands).await;

    println!("{:?}", results);

    Ok(())
}

因为你在一个函数体内,所以你甚至不需要将 futures 装箱,类型推断可以完成所有工作:

use futures::future;
use redis::RedisResult;

const BATCH_SIZE: usize = 10;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let redis_client = redis::Client::open("redis://127.0.0.1:6379")?;
    let redis_connection = redis_client.get_multiplexed_tokio_connection().await?;

    let mut commands = vec![];
    for _ in 0..BATCH_SIZE {
        let mut redis_connection = redis_connection.clone();
        commands.push(async move {
            redis::cmd("PING").query_async::<_, Option<String>>(&mut redis_connection).await
        });
    }
    let results = future::join_all(commands).await;

    println!("{:?}", results);

    Ok(())
}