如何使用 reqwest 执行并行异步 HTTP GET 请求?
How can I perform parallel asynchronous HTTP GET requests with reqwest?
The async example 很有用,但作为 Rust 和 Tokio 的新手,我正在努力研究如何一次执行 N 个请求,使用向量中的 URLs,并创建迭代器每个 URL 的响应 HTML 作为字符串。
这是怎么做到的?
并发请求
自要求 0.10 起:
use futures::{stream, StreamExt}; // 0.3.5
use reqwest::Client; // 0.10.6
use tokio; // 0.2.21, features = ["macros"]
const CONCURRENT_REQUESTS: usize = 2;
#[tokio::main]
async fn main() {
let client = Client::new();
let urls = vec!["https://api.ipify.org"; 2];
let bodies = stream::iter(urls)
.map(|url| {
let client = &client;
async move {
let resp = client.get(url).send().await?;
resp.bytes().await
}
})
.buffer_unordered(CONCURRENT_REQUESTS);
bodies
.for_each(|b| async {
match b {
Ok(b) => println!("Got {} bytes", b.len()),
Err(e) => eprintln!("Got an error: {}", e),
}
})
.await;
}
stream::iter(urls)
获取一组字符串并将其转换为 Stream
。
.map(|url| {
运行 流中每个元素的异步函数,并将元素转换为新类型。
let client = &client;
async move {
显式引用 Client
并将引用(不是原来的 Client
)移动到匿名异步块中。
let resp = client.get(url).send().await?;
使用 Client
的连接池启动异步 GET 请求并等待请求。
resp.bytes().await
请求并等待响应字节。
.buffer_unordered(N);
将期货流转换为这些期货值的流,并发执行期货。
bodies
.for_each(|b| {
async {
match b {
Ok(b) => println!("Got {} bytes", b.len()),
Err(e) => eprintln!("Got an error: {}", e),
}
}
})
.await;
将流转换回单个未来,打印出沿途接收到的数据量,然后等待未来完成。
另请参阅:
- Join futures with limited concurrency
- How to merge iterator of streams?
无限制执行
如果你愿意,你也可以将迭代器转换为未来的迭代器并使用 future::join_all
:
use futures::future; // 0.3.4
use reqwest::Client; // 0.10.1
use tokio; // 0.2.11
#[tokio::main]
async fn main() {
let client = Client::new();
let urls = vec!["https://api.ipify.org"; 2];
let bodies = future::join_all(urls.into_iter().map(|url| {
let client = &client;
async move {
let resp = client.get(url).send().await?;
resp.bytes().await
}
}))
.await;
for b in bodies {
match b {
Ok(b) => println!("Got {} bytes", b.len()),
Err(e) => eprintln!("Got an error: {}", e),
}
}
}
我鼓励使用第一个示例,因为您通常希望限制并发性,buffer
和 buffer_unordered
有助于。
并行请求
并发请求通常就足够了,但有时您需要 并发请求。在这种情况下,您需要生成一个任务。
use futures::{stream, StreamExt}; // 0.3.8
use reqwest::Client; // 0.10.9
use tokio; // 0.2.24, features = ["macros"]
const PARALLEL_REQUESTS: usize = 2;
#[tokio::main]
async fn main() {
let urls = vec!["https://api.ipify.org"; 2];
let client = Client::new();
let bodies = stream::iter(urls)
.map(|url| {
let client = client.clone();
tokio::spawn(async move {
let resp = client.get(url).send().await?;
resp.bytes().await
})
})
.buffer_unordered(PARALLEL_REQUESTS);
bodies
.for_each(|b| async {
match b {
Ok(Ok(b)) => println!("Got {} bytes", b.len()),
Ok(Err(e)) => eprintln!("Got a reqwest::Error: {}", e),
Err(e) => eprintln!("Got a tokio::JoinError: {}", e),
}
})
.await;
}
主要区别是:
- 我们使用
tokio::spawn
在单独的 任务中执行工作。
- 我们必须给每个任务单独的
reqwest::Client
。作为 recommended,我们克隆一个共享客户端以使用连接池。
- 无法加入任务时还有一个错误案例。
另请参阅:
- What is the difference between concurrent programming and parallel programming?
- What is the difference between concurrency and parallelism?
- What is the difference between concurrency, parallelism and asynchronous methods?
如果可能的话,我建议使用 std async 和 rayon。它们现在都很成熟,并且非常容易上手,因为这里有异步代码{/* std 中的 */} 范围边界。您还可以使用 into/alongside tokio 功能集成 https://docs.rs/async-std/1.10.0/async_std/#features
The async example 很有用,但作为 Rust 和 Tokio 的新手,我正在努力研究如何一次执行 N 个请求,使用向量中的 URLs,并创建迭代器每个 URL 的响应 HTML 作为字符串。
这是怎么做到的?
并发请求
自要求 0.10 起:
use futures::{stream, StreamExt}; // 0.3.5
use reqwest::Client; // 0.10.6
use tokio; // 0.2.21, features = ["macros"]
const CONCURRENT_REQUESTS: usize = 2;
#[tokio::main]
async fn main() {
let client = Client::new();
let urls = vec!["https://api.ipify.org"; 2];
let bodies = stream::iter(urls)
.map(|url| {
let client = &client;
async move {
let resp = client.get(url).send().await?;
resp.bytes().await
}
})
.buffer_unordered(CONCURRENT_REQUESTS);
bodies
.for_each(|b| async {
match b {
Ok(b) => println!("Got {} bytes", b.len()),
Err(e) => eprintln!("Got an error: {}", e),
}
})
.await;
}
stream::iter(urls)
获取一组字符串并将其转换为 Stream
。
.map(|url| {
运行 流中每个元素的异步函数,并将元素转换为新类型。
let client = &client; async move {
显式引用 Client
并将引用(不是原来的 Client
)移动到匿名异步块中。
let resp = client.get(url).send().await?;
使用 Client
的连接池启动异步 GET 请求并等待请求。
resp.bytes().await
请求并等待响应字节。
.buffer_unordered(N);
将期货流转换为这些期货值的流,并发执行期货。
bodies .for_each(|b| { async { match b { Ok(b) => println!("Got {} bytes", b.len()), Err(e) => eprintln!("Got an error: {}", e), } } }) .await;
将流转换回单个未来,打印出沿途接收到的数据量,然后等待未来完成。
另请参阅:
- Join futures with limited concurrency
- How to merge iterator of streams?
无限制执行
如果你愿意,你也可以将迭代器转换为未来的迭代器并使用 future::join_all
:
use futures::future; // 0.3.4
use reqwest::Client; // 0.10.1
use tokio; // 0.2.11
#[tokio::main]
async fn main() {
let client = Client::new();
let urls = vec!["https://api.ipify.org"; 2];
let bodies = future::join_all(urls.into_iter().map(|url| {
let client = &client;
async move {
let resp = client.get(url).send().await?;
resp.bytes().await
}
}))
.await;
for b in bodies {
match b {
Ok(b) => println!("Got {} bytes", b.len()),
Err(e) => eprintln!("Got an error: {}", e),
}
}
}
我鼓励使用第一个示例,因为您通常希望限制并发性,buffer
和 buffer_unordered
有助于。
并行请求
并发请求通常就足够了,但有时您需要 并发请求。在这种情况下,您需要生成一个任务。
use futures::{stream, StreamExt}; // 0.3.8
use reqwest::Client; // 0.10.9
use tokio; // 0.2.24, features = ["macros"]
const PARALLEL_REQUESTS: usize = 2;
#[tokio::main]
async fn main() {
let urls = vec!["https://api.ipify.org"; 2];
let client = Client::new();
let bodies = stream::iter(urls)
.map(|url| {
let client = client.clone();
tokio::spawn(async move {
let resp = client.get(url).send().await?;
resp.bytes().await
})
})
.buffer_unordered(PARALLEL_REQUESTS);
bodies
.for_each(|b| async {
match b {
Ok(Ok(b)) => println!("Got {} bytes", b.len()),
Ok(Err(e)) => eprintln!("Got a reqwest::Error: {}", e),
Err(e) => eprintln!("Got a tokio::JoinError: {}", e),
}
})
.await;
}
主要区别是:
- 我们使用
tokio::spawn
在单独的 任务中执行工作。 - 我们必须给每个任务单独的
reqwest::Client
。作为 recommended,我们克隆一个共享客户端以使用连接池。 - 无法加入任务时还有一个错误案例。
另请参阅:
- What is the difference between concurrent programming and parallel programming?
- What is the difference between concurrency and parallelism?
- What is the difference between concurrency, parallelism and asynchronous methods?
如果可能的话,我建议使用 std async 和 rayon。它们现在都很成熟,并且非常容易上手,因为这里有异步代码{/* std 中的 */} 范围边界。您还可以使用 into/alongside tokio 功能集成 https://docs.rs/async-std/1.10.0/async_std/#features