是否有一个 FuturesOrdered 替代方案可以一个一个地产生结果?

Is there a FuturesOrdered alternative that yields results one by one?

在 Rust 中,我有一堆我想并行执行的异步函数。处理这些函数的结果的顺序很重要。我还想在这些函数可用时检索它们的结果。

让我解释得不好

这里是FuturesOrdered的描述:

This "combinator" is similar to FuturesUnordered, but it imposes an order on top of the set of futures. While futures in the set will race to completion in parallel, results will only be returned in the order their originating futures were added to the queue.

到目前为止一切顺利。现在看这个例子:

let mut ft = FuturesOrdered::new();
ft.push(wait_n(1)); // wait_n sleeps
ft.push(wait_n(2)); // for the given
ft.push(wait_n(4)); // number of secs
ft.push(wait_n(3));
ft.push(wait_n(5));
let r = ft.collect::<Vec<u64>>().await;

因为 FuturesOrdered 等待 所有 期货完成;这是我得到的:

|--|        ++
|----|      ++
|--------|  ++
|------|    ++
|----------|++
            ++-> all results available here 

这就是我想要的:

|--|++
|----|++
|--------|++
|------|    ++
|----------|  ++
               

换句话说;我想等待 下一个 的未来;随着剩余的期货继续竞相完成。另请注意,即使任务 #4 在任务 #3 之前完成;由于初始订单,它在#3 之后处理。

如何获得像这样并发执行的期货流?我希望是这样的:

let mut ft = MagicalStreamOfOrderedFutures::new();
ft.push(wait_n(1));
ft.push(wait_n(2));
ft.push(wait_n(4));
ft.push(wait_n(3));
ft.push(wait_n(5));
while Some(result) = ft.next().await {
  // returns results in order at seconds 1,2,4,4,5
}

Since FuturesOrdered awaits until all futures are completed

它本身并不是那样做的。

你要求它是因为你collect-ing Vec。由于 StreamExt::collect 的重点是将 整个流 转换为 collection:

Transforms a stream into a collection, returning a future representing the result of that computation. The returned future will be resolved when the stream terminates.

它只能在所有期货结算后产生 collection。

如果您延迟访问流,它会在项目可用时生成项目:

let mut s = stream::FuturesOrdered::new();
s.push(future::lazy(|_| 1).boxed());
s.push(future::lazy(|_| panic!("never resolves")).boxed());

let f = s.next().await;
println!("{:?}", f);

works just fine,尽管第二个未来不可能解决。如果你尝试 collect 它,它会崩溃。

How can I get a stream of futures that are executed concurrently like this? I was hoping for something like this:

Exactly like that?

let mut s = stream::FuturesOrdered::new();
s.push(sleep(Duration::from_millis(100)));
s.push(sleep(Duration::from_millis(200)));
s.push(sleep(Duration::from_millis(400)));
s.push(sleep(Duration::from_millis(300)));
s.push(sleep(Duration::from_millis(500)));

let start = Instant::now();
while s.next().await.is_some() {
    println!("{:.2?}", Instant::now() - start);
}
101.49ms
200.98ms
400.94ms
400.96ms
501.40ms

(使用毫秒睡眠,因为 multi-second 睡眠往往会触发 playground 的超时)