如何并发抓取未知结尾的分页网页?

How to concurrently crawl paginated webpages with unknown end?

我正在尝试使用 tokio 异步运行时在 Rust 中编写网络爬虫。我想异步 fetch/process 多个页面,但我也希望爬虫在到达末尾时停止(换句话说,如果没有任何东西可以爬)。到目前为止,我已经使用 futures::future::try_join_all 从我作为 Futures 提供的异步函数中获取集体结果,但这显然需要程序事先知道要抓取的总页数。例如:

async fn fetch(_url: String) -> Result<String, ()> {
    tokio::time::sleep(std::time::Duration::from_millis(100)).await;

    Ok(String::from("foo"))
}

#[tokio::main]
async fn main() {
    let search_url = "https://example.com/?page={page_num}";

    let futures = (1..=3)
        .map(|page_num| search_url.replace("{page_num}", &page_num.to_string()))
        .map(|url| fetch(url));

    let _ = futures::future::try_join_all(futures).await.unwrap();
}

Rust Playground

在这个简单的示例中,我必须在实际获取页面之前知道要浏览的总页数 (1..=3)。我想要的是,不提供任何范围,并且有条件停止整个过程。 (例如,如果 HTML 结果包含“未找到”)

我调查了 futures::executor::block_on 但我不确定它是否可以用于此任务。

下面是使用 Stream and .buffered() 的大致方法:

use futures::{future, stream, StreamExt};

#[derive(Debug)]
struct Error;

async fn fetch_page(page: i32) -> Result<String, Error> {
    println!("fetching page: {}", page);

    // simulate loading pages
    tokio::time::sleep(std::time::Duration::from_millis(100)).await;
    if page < 5 {
        // successfully got page
        Ok(String::from("foo"))
    } else {
        // page doesn't exist
        Err(Error)
    }
}

#[tokio::main]
async fn main() {
    let pages: Vec<String> = stream::iter(1..)
        .map(fetch_page)
        .buffered(10)
        .take_while(|page| future::ready(page.is_ok()))
        .map(|page| page.unwrap())
        .collect()
        .await;

    println!("pages: {:?}", pages);
}

我将详细介绍 main() 中的步骤:

运行 上面的代码打印出以下内容,表明它一次尝试 10 次,但只会 return 直到第一次失败:

fetching page: 1
fetching page: 2
fetching page: 3
fetching page: 4
fetching page: 5
fetching page: 6
fetching page: 7
fetching page: 8
fetching page: 9
fetching page: 10
pages: ["foo", "foo", "foo", "foo"]

这掩盖了一些可有可无的优点,例如处理非缺失页面错误或重试,但我希望这能为您打下良好的基础。在这些情况下,您可能会使用 TryStreamExt 上的方法,它专门处理 Result 的流。