如何使用 Tokio 和 Surf 正确地批量处理 HTTP 请求?
How to properly do batches of HTTP requests using Tokio and Surf?
我想做的是将所有 JobHandles 保存在一个向量中,并在迭代 X 时间后等待它们。
我这样做是因为如果我在特定时间范围内发送太多请求,我发送请求的端点也会 return 出现 429 错误。
#[tokio::main]
pub async fn get_collection_stats(city_name: String) -> Result<(serde_json::Value, surf::StatusCode), Box<dyn Error>> {
let endpoint = format!("https://some-city-api.org/{}", city_name);
let mut city_res = surf::get(&endpoint).await;
let mut res: surf::Response= match city_res {
Ok(value) => value,
Err(e) => { panic!("Error: {}", e) }
};
let stats: serde_json::Value = match res.body_json().await.ok() {
Some(val) => val,
None => serde_json::from_str("{}").unwrap()
};
Ok((stats, res.status()))
}
let mut count = 0;
let mut requests: Vec<_> = Vec::new();
for name in city_names {
if count < 5 {
let mut stats = tokio::task::spawn_blocking(|| {
match get_data_about_city(String::from(name)) {
Ok(value) => value,
Err(_) => serde_json::from_str("{}").unwrap()
}
});
requests.push(stats);
count += 1;
} else {
for task in requests {
dbg!(task.await);
}
count = 0;
break
}
}
到目前为止我有这个。这工作正常,但只有当我在 else 中休息时它才有效。我希望能够在不中断的情况下批量处理 5 个请求。没有休息,我得到这样的错误:
error[E0382]: borrow of moved value: `requests`
--> src\main.rs:109:13
|
87 | let mut requests: Vec<_> = Vec::new();
| ------------ move occurs because `requests` has type `Vec<tokio::task::JoinHandle<(serde_json::Value, StatusCode)>>`, which does not implement the `Copy` trait
...
109 | requests.push(stats);
| ^^^^^^^^^^^^^^^^^^^^ value borrowed here after move
...
112 | for task in requests {
| --------
| |
| `requests` moved due to this implicit call to `.into_iter()`, in previous iteration of loop
| help: consider borrowing to avoid moving into the for loop: `&requests`
|
note: this function takes ownership of the receiver `self`, which moves `requests`
--> C:\Users\Zed\.rustup\toolchains\stable-x86_64-pc-windows-msvc\lib/rustlib/src/rust\library\core\src\iter\traits\collect.rs:234:18
|
234 | fn into_iter(self) -> Self::IntoIter;
| ^^^^
好的,我解决了移动问题。现在我遇到了这个问题。
|
113 | dbg!(task.await);
| ^^^^^^^^^^ `&tokio::task::JoinHandle<(serde_json::Value, StatusCode)>` is not a future
|
= help: the trait `Future` is not implemented for `&tokio::task::JoinHandle<(serde_json::Value, StatusCode)>`
= note: `Future` is implemented for `&mut tokio::task::JoinHandle<(serde_json::Value, surf::StatusCode)>`, but not for `&tokio::task::JoinHandle<(serde_json::Value, surf::StatusCode)>`
我应该如何进行我想做的事情?
我明白了。我最终使用了期货箱。流程看起来像这样。
for name in city_names {
let url = format!("https://some-city-api.org/{}", name);
urls.push(url.clone());
}
let mut futs = FuturesUnordered::new();
for url in urls {
futs.push(surf::get(url));
// 50 requests reached, await everything in buffer
if futs.len() == 50 {
while let Some(res) = futs.next().await {
// Do something with requests
}
}
}
我想做的是将所有 JobHandles 保存在一个向量中,并在迭代 X 时间后等待它们。
我这样做是因为如果我在特定时间范围内发送太多请求,我发送请求的端点也会 return 出现 429 错误。
#[tokio::main]
pub async fn get_collection_stats(city_name: String) -> Result<(serde_json::Value, surf::StatusCode), Box<dyn Error>> {
let endpoint = format!("https://some-city-api.org/{}", city_name);
let mut city_res = surf::get(&endpoint).await;
let mut res: surf::Response= match city_res {
Ok(value) => value,
Err(e) => { panic!("Error: {}", e) }
};
let stats: serde_json::Value = match res.body_json().await.ok() {
Some(val) => val,
None => serde_json::from_str("{}").unwrap()
};
Ok((stats, res.status()))
}
let mut count = 0;
let mut requests: Vec<_> = Vec::new();
for name in city_names {
if count < 5 {
let mut stats = tokio::task::spawn_blocking(|| {
match get_data_about_city(String::from(name)) {
Ok(value) => value,
Err(_) => serde_json::from_str("{}").unwrap()
}
});
requests.push(stats);
count += 1;
} else {
for task in requests {
dbg!(task.await);
}
count = 0;
break
}
}
到目前为止我有这个。这工作正常,但只有当我在 else 中休息时它才有效。我希望能够在不中断的情况下批量处理 5 个请求。没有休息,我得到这样的错误:
error[E0382]: borrow of moved value: `requests`
--> src\main.rs:109:13
|
87 | let mut requests: Vec<_> = Vec::new();
| ------------ move occurs because `requests` has type `Vec<tokio::task::JoinHandle<(serde_json::Value, StatusCode)>>`, which does not implement the `Copy` trait
...
109 | requests.push(stats);
| ^^^^^^^^^^^^^^^^^^^^ value borrowed here after move
...
112 | for task in requests {
| --------
| |
| `requests` moved due to this implicit call to `.into_iter()`, in previous iteration of loop
| help: consider borrowing to avoid moving into the for loop: `&requests`
|
note: this function takes ownership of the receiver `self`, which moves `requests`
--> C:\Users\Zed\.rustup\toolchains\stable-x86_64-pc-windows-msvc\lib/rustlib/src/rust\library\core\src\iter\traits\collect.rs:234:18
|
234 | fn into_iter(self) -> Self::IntoIter;
| ^^^^
好的,我解决了移动问题。现在我遇到了这个问题。
|
113 | dbg!(task.await);
| ^^^^^^^^^^ `&tokio::task::JoinHandle<(serde_json::Value, StatusCode)>` is not a future
|
= help: the trait `Future` is not implemented for `&tokio::task::JoinHandle<(serde_json::Value, StatusCode)>`
= note: `Future` is implemented for `&mut tokio::task::JoinHandle<(serde_json::Value, surf::StatusCode)>`, but not for `&tokio::task::JoinHandle<(serde_json::Value, surf::StatusCode)>`
我应该如何进行我想做的事情?
我明白了。我最终使用了期货箱。流程看起来像这样。
for name in city_names {
let url = format!("https://some-city-api.org/{}", name);
urls.push(url.clone());
}
let mut futs = FuturesUnordered::new();
for url in urls {
futs.push(surf::get(url));
// 50 requests reached, await everything in buffer
if futs.len() == 50 {
while let Some(res) = futs.next().await {
// Do something with requests
}
}
}