Rust,如何执行基本的递归异步?

Rust, how to perform basic recursive async?

我只是在做一些快速实验以尝试学习 Rust 语言,我已经完成了一些成功的异步测试,这是我的起点:

use async_std::task;
use futures;
use std::time::SystemTime;

fn main() {
    let now = SystemTime::now();
    task::block_on(async {
        let mut fs = Vec::new();
        let sum = 100000000;
        let chunks: u64 = 5; //only valid for factors of sum
        let chunk_size: u64 = sum/chunks;
        for n in 1..=chunks {
            fs.push(task::spawn(async move {
                add_range((n - 1) * chunk_size + 1, n * chunk_size + 1)
            }));
        }
        let vals = futures::future::join_all(fs).await;
        // 5000000050000000 for this configuration of inputs
        println!("{}", vals.iter().sum::<u64>());
    });
    println!("{}ms", now.elapsed().unwrap().as_millis());
}

fn add_range(start: u64, end: u64) -> u64 {
    println!("{}, {}", start, end);
    let mut total: u64 = 0;
    for n in start..end {
        total += n;
    }
    return total;
}

通过更改 chunks 的值,您可以更改 task::spawn 的数量。现在,我希望 add_range 函数是递归的,并根据输入继续分叉工作人员,而不是一组扁平的工作人员,但是在编译器错误之后,我让自己非常纠结:

use async_std::task;
use futures;
use std::future::Future;
use std::pin::Pin;

fn main() {
    let pin_box_u64 = task::block_on(add_range(0, 10, 10, 1, 1001));
    println!("{}", pin_box_u64/*how do i get u64 out of this*/)
}

// recursively calls itself in a branching tree structure
// forking off more worker threads
async fn add_range(
    depth: u64,
    chunk_split: u64,
    chunk_size: u64,
    start: u64,
    end: u64,
) -> Pin<Box<dyn Future<Output = u64>>> {
    println!("{}, {}, {}", depth, start, end);
    // if the range of start to end is more than the allowed
    // chunk_size then fork off more workers dividing
    // the work up further.
    if end - start > chunk_size {
        let mut fs = Vec::new();
        let next_chunk_size = (end - start) / chunk_split;
        for n in 0..chunk_split {
            let s = start + (next_chunk_size * n);
            let mut e = start + (next_chunk_size * (n + 1));
            if e > end {
                e = end;
            }
            // spawn more workers
            fs.push(task::spawn(add_range(depth + 1, chunk_split, chunk_size, s, e)));
        }
        return Box::pin(async move {
            // join workers back up and do joining sum. 
            return futures::future::join_all(fs).await.iter().map(/*how do i get u64s out of here*/).sum::<u64>();
        });
    } else {
        // else the work is less than the allowed chunk_size
        // so lets now do the actual sum for my chunk 
        let mut total: u64 = 0;
        for n in start..end {
            total += n;
        }
        return Box::pin(async move { total });
    }
}

我已经尝试了一段时间,但我觉得我越来越迷失在编译器错误中。

您需要将 returned future 装箱,否则编译器无法确定 return 类型的大小。

可以在此处找到其他上下文:https://rust-lang.github.io/async-book/07_workarounds/04_recursion.html

use std::pin::Pin;

use async_std::task;
use futures::Future;
use futures::FutureExt;

fn main() {
    let pin_box_u64 = task::block_on(add_range(0, 10, 10, 1, 1001));
    println!("{}", pin_box_u64)
}

// recursively calls itself in a branching tree structure
// forking off more worker threads
fn add_range(
    depth: u64,
    chunk_split: u64,
    chunk_size: u64,
    start: u64,
    end: u64,
) -> Pin<Box<dyn Future<Output = u64> + Send + 'static>> {
    println!("{}, {}, {}", depth, start, end);
    // if the range of start to end is more than the allowed
    // chunk_size then fork off more workers dividing
    // the work up further.
    if end - start > chunk_size {
        let mut fs = Vec::new();
        let next_chunk_size = (end - start) / chunk_split;
        for n in 0..chunk_split {
            let s = start + (next_chunk_size * n);
            let mut e = start + (next_chunk_size * (n + 1));
            if e > end {
                e = end;
            }
            // spawn more workers
            fs.push(task::spawn(add_range(
                depth + 1,
                chunk_split,
                chunk_size,
                s,
                e,
            )));
        }
        // join workers back up and do joining sum.
        return futures::future::join_all(fs)
            .map(|v| v.iter().sum::<u64>())
            .boxed();
    } else {
        // else the work is less than the allowed chunk_size
        // so lets now do the actual sum for my chunk
        let mut total: u64 = 0;
        for n in start..end {
            total += n;
        }
        return futures::future::ready(total).boxed();
    }
}