如何并发抓取未知结尾的分页网页?
How to concurrently crawl paginated webpages with unknown end?
我正在尝试使用 tokio 异步运行时在 Rust 中编写网络爬虫。我想异步 fetch/process 多个页面,但我也希望爬虫在到达末尾时停止(换句话说,如果没有任何东西可以爬)。到目前为止,我已经使用 futures::future::try_join_all 从我作为 Future
s 提供的异步函数中获取集体结果,但这显然需要程序事先知道要抓取的总页数。例如:
async fn fetch(_url: String) -> Result<String, ()> {
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
Ok(String::from("foo"))
}
#[tokio::main]
async fn main() {
let search_url = "https://example.com/?page={page_num}";
let futures = (1..=3)
.map(|page_num| search_url.replace("{page_num}", &page_num.to_string()))
.map(|url| fetch(url));
let _ = futures::future::try_join_all(futures).await.unwrap();
}
在这个简单的示例中,我必须在实际获取页面之前知道要浏览的总页数 (1..=3
)。我想要的是,不提供任何范围,并且有条件停止整个过程。 (例如,如果 HTML 结果包含“未找到”)
我调查了 futures::executor::block_on 但我不确定它是否可以用于此任务。
下面是使用 Stream
and .buffered()
的大致方法:
use futures::{future, stream, StreamExt};
#[derive(Debug)]
struct Error;
async fn fetch_page(page: i32) -> Result<String, Error> {
println!("fetching page: {}", page);
// simulate loading pages
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
if page < 5 {
// successfully got page
Ok(String::from("foo"))
} else {
// page doesn't exist
Err(Error)
}
}
#[tokio::main]
async fn main() {
let pages: Vec<String> = stream::iter(1..)
.map(fetch_page)
.buffered(10)
.take_while(|page| future::ready(page.is_ok()))
.map(|page| page.unwrap())
.collect()
.await;
println!("pages: {:?}", pages);
}
我将详细介绍 main()
中的步骤:
stream::iter(1..)
创建一个无限 Stream
整数代表每个页码
.map(fetch_page)
当然会为每个页码调用fetch_page
.buffered(10)
这将允许最多 10 个 fetch_page
调用同时发生,并将保留原始顺序
.take_while(|page| future::ready(page.is_ok()))
will keep the stream going until a fetch_page
returns an error, it uses futures::future::ready
因为传递给 take_while
的函数必须 return 一个 future
.map(|page| page.unwrap())
将拉出成功的页面,它不会崩溃,因为我们知道发生任何错误时流都会停止
.collect()
基本上与迭代器做同样的事情,除了你必须 .await
it
运行 上面的代码打印出以下内容,表明它一次尝试 10 次,但只会 return 直到第一次失败:
fetching page: 1
fetching page: 2
fetching page: 3
fetching page: 4
fetching page: 5
fetching page: 6
fetching page: 7
fetching page: 8
fetching page: 9
fetching page: 10
pages: ["foo", "foo", "foo", "foo"]
这掩盖了一些可有可无的优点,例如处理非缺失页面错误或重试,但我希望这能为您打下良好的基础。在这些情况下,您可能会使用 TryStreamExt
上的方法,它专门处理 Result
的流。
我正在尝试使用 tokio 异步运行时在 Rust 中编写网络爬虫。我想异步 fetch/process 多个页面,但我也希望爬虫在到达末尾时停止(换句话说,如果没有任何东西可以爬)。到目前为止,我已经使用 futures::future::try_join_all 从我作为 Future
s 提供的异步函数中获取集体结果,但这显然需要程序事先知道要抓取的总页数。例如:
async fn fetch(_url: String) -> Result<String, ()> {
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
Ok(String::from("foo"))
}
#[tokio::main]
async fn main() {
let search_url = "https://example.com/?page={page_num}";
let futures = (1..=3)
.map(|page_num| search_url.replace("{page_num}", &page_num.to_string()))
.map(|url| fetch(url));
let _ = futures::future::try_join_all(futures).await.unwrap();
}
在这个简单的示例中,我必须在实际获取页面之前知道要浏览的总页数 (1..=3
)。我想要的是,不提供任何范围,并且有条件停止整个过程。 (例如,如果 HTML 结果包含“未找到”)
我调查了 futures::executor::block_on 但我不确定它是否可以用于此任务。
下面是使用 Stream
and .buffered()
的大致方法:
use futures::{future, stream, StreamExt};
#[derive(Debug)]
struct Error;
async fn fetch_page(page: i32) -> Result<String, Error> {
println!("fetching page: {}", page);
// simulate loading pages
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
if page < 5 {
// successfully got page
Ok(String::from("foo"))
} else {
// page doesn't exist
Err(Error)
}
}
#[tokio::main]
async fn main() {
let pages: Vec<String> = stream::iter(1..)
.map(fetch_page)
.buffered(10)
.take_while(|page| future::ready(page.is_ok()))
.map(|page| page.unwrap())
.collect()
.await;
println!("pages: {:?}", pages);
}
我将详细介绍 main()
中的步骤:
stream::iter(1..)
创建一个无限Stream
整数代表每个页码.map(fetch_page)
当然会为每个页码调用fetch_page
.buffered(10)
这将允许最多 10 个fetch_page
调用同时发生,并将保留原始顺序.take_while(|page| future::ready(page.is_ok()))
will keep the stream going until afetch_page
returns an error, it usesfutures::future::ready
因为传递给take_while
的函数必须 return 一个 future.map(|page| page.unwrap())
将拉出成功的页面,它不会崩溃,因为我们知道发生任何错误时流都会停止.collect()
基本上与迭代器做同样的事情,除了你必须.await
it
运行 上面的代码打印出以下内容,表明它一次尝试 10 次,但只会 return 直到第一次失败:
fetching page: 1
fetching page: 2
fetching page: 3
fetching page: 4
fetching page: 5
fetching page: 6
fetching page: 7
fetching page: 8
fetching page: 9
fetching page: 10
pages: ["foo", "foo", "foo", "foo"]
这掩盖了一些可有可无的优点,例如处理非缺失页面错误或重试,但我希望这能为您打下良好的基础。在这些情况下,您可能会使用 TryStreamExt
上的方法,它专门处理 Result
的流。