如何使用高阶异步函数来过滤 Vec?

How do I use an higher order async function to filter a Vec?

通过 async 谓词过滤,“简单”方式

一种方法是 join_all!() Future 计算每个项目的过滤器。然后根据这些进行同步过滤:

let arr = vec![...]
let filters = join_all!(arr.iter().map(|it| async { predicate(it).await })
let filtered = arr.enumerate().filter(|index, item| filters[index]).collect::<Vec<_>>();

然而,探索 Rust,有一个 cleaner 方法通过 futures::stream::iter 迭代器:

let filtered = futures::stream::iter(vec![...])
  .filter(|item| async { predicate(item).await })
  .collect::<Vec<_>>
  .await

到目前为止一切都很好。


可配置过滤器:麻烦开始了

如果我们想使用功能性 API 来使 predicate 易于配置怎么办?

在这种情况下,我们的调用将如下所示:

let filtered = futures::stream::iter(vec![...])
  .filter(by_length(4)) // neat!
  .collect::<Vec<_>>
  .await

谓词:

fn by_length(min_length: usize) -> impl FnMut(&i32) -> Future<Output = bool> {
    |n| async { query_length(n).await > min_length }
}

async fn query_length(n: &i32) -> usize {
    // pretend we're making a network request to fetch `len`...
    // for easy reproducibility's sake this will work here
    n.to_string().len()
}

不幸的是,编译器不再高兴了:它抱怨 Future 需要 dyn 关键字。并且,在添加 dyn 之后,它抱怨它不是 Sized,就像在这个最小复制中一样:

use futures::future::Future;

#[tokio::main]
async fn main() {
    let arr = vec![10, 100, 1000];
    
    let filtered = futures::stream::iter(arr.into_iter())
      .filter(by_length(3))
      .collect::<Vec<_>>()
      .await;
      
    println!("{:?}", filtered); // should print [100, 1000]
}

fn by_length(min_length: usize) -> impl FnMut(&i32) -> Future<Output = bool> {
    |n| async { query_length(n).await > min_length }
}

// yeah it doesn't need to be async in this case, but let's pretend
async fn query_length(n: &i32) -> usize {
    n.to_string().len()
}

错误:

   Compiling playground v0.0.1 (/playground)
error[E0277]: the size for values of type `(dyn futures::Future<Output = bool> + 'static)` cannot be known at compilation time
  --> src/main.rs:16:9
   |
16 |     |n| async { query_length(n).await > min_length }
   |         ^ doesn't have a size known at compile-time
   |
   = help: the trait `Sized` is not implemented for `(dyn futures::Future<Output = bool> + 'static)`
   = note: the return type of a function must have a statically known size

问题

虽然你不能 return 来自 impl FnMutimpl Future,但你可以 return 盒装未来,即 dyn Future 必须是装箱是因为它在 return 位置。在借用了一些俄罗斯方块检查器之后,我们得出了这个:

fn by_length(min_length: usize) -> impl FnMut(&i32) -> Pin<Box<dyn Future<Output = bool>>> {
    move |&n| Box::pin(async move { query_length(&n).await > min_length })
}

Playground


However we only need a single typed case here, for which we should be able to generate inline-able machine code. How can Rust be instructed to do that?

我认为目前这不可能,至少如果你调用 async fnquery_length() 是不可能的。考虑一个手动编写的实现:

fn by_length(min_length: usize) -> impl FnMut(&i32) -> ByLength {
    move |&n| ByLength { n }
}

现在,我们如何定义ByLength?它必须实现 Future,它的 poll() 只是传输轮询 query_length(n) 的结果。但是 query_length(n) 编辑的未来 return 可能会暂停多次,因此 ByLength 必须 存储 未来,以便它可以根据需要多次轮询- 例如:

struct ByLength {
    n: i32,
    query_fut: Option<???>,
}

impl Future for ByLength {
    type Output = usize;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<usize> {
        if self.query_fut.is_none() {
            self.query_fut = Some(query_length(self.n));
        }
        self.query_fut.unwrap().poll(cx)
    }
}

但现在问题变得很明显:没有类型可以替代 ???,因为 query_length() 是一个异步函数,return 是匿名类型的未来。使 ByLength 泛型不起作用,因为我们又回到了闭包不能 return 它提供的泛型类型的问题。我们想要的签名需要 higher-kinded 类型:

fn by_length(min_length: usize) -> impl for<T> FnMut(&i32) -> ByLength<T> {
    move |&n| ByLength { n }
}

...但是如果我们有那个,我们可以直接使用 query_length()

fn by_length(min_length: usize) -> impl for<T: Future<Output = usize>> FnMut(&i32) -> T {
    move |&n| async move { by_length(&n) }
}