为什么即使在未来解决之后,在循环中使用带有 Tokio 期货的克隆超级客户端也会阻塞?
Why does using a cloned hyper client with Tokio futures in a loop block even after the future resolves?
我有一个定期更新缓存数据的服务。每 N 秒它会使用循环(tokio::run(future_update(http_client.clone()))
)触发未来,但它不会返回到未来解决的父函数。循环阻塞,我只得到一次迭代。
当我创建一个新的超 HTTP 客户端而不是传递一个克隆的客户端时,一切正常。它也不起作用 Arc<Client>
。
pub fn trigger_cache_reload(http_client: Arc<Client<HttpConnector, Body>>) {
let load_interval_sec = get_load_interval_sec(conf.load_interval_seconds.clone());
std::thread::spawn(move || loop {
let http_client = http_client.clone();
info!("Woke up");
tokio::run(pipeline(http_client));
info!(
"Pipeline run complete. Huuhh Now I need sleep of {} secs. Sleeping",
load_interval_sec
);
std::thread::sleep(std::time::Duration::from_secs(load_interval_sec));
});
}
fn pipeline(
client: Arc<Client<HttpConnector, Body>>,
) -> Box<dyn Future<Item = (), Error = ()> + Send> {
let res = fetch_message_payload() //return type of this call is Box<dyn Future<Item = (), Error = Error> + Send>
.map_err(Error::from)
.and_then(|_| {
//let client = hyper::Client::builder().max_idle_per_host(1).build_http();
//if i create new client here every time and use it then all working is fine.
refresh_cache(client) //return type of this call is Box<dyn Future<Item = (), Error = Error> + Send>
.map_err(Error::from)
.and_then(|arg| {
debug!("refresh_cache completed");
Ok(arg)
})
});
let res = res.or_else(|e| {
error!("error {:?}", e);
Ok(())
});
Box::new(res)
}
调用 trigger_cache_reload
一次后,我收到 "woke up"
日志消息。在成功完成未来的一段时间后,我还会收到 "refresh_cache completed"
日志消息。我没有收到带或不带 Arc
的 "sleeping"
日志消息。
如果我每次都在未来创建一个新客户端,我就能得到 "sleeping"
日志消息。
tokio::run
每次调用都会创建一个全新的事件循环和线程池(reactor + executor)。这真的不是你想做的。
超级客户端会将其状态绑定到前一个事件循环,如果轮询新事件循环则无法取得进展,因为旧事件循环将在 run
完成后被销毁。这就是为什么新客户端可以工作,但您不能重复使用旧客户端。
这里有两种解决方案:
如果您的应用程序的其余部分不使用 tokio,我将只使用同步 reqwest::Client。如果您不需要大量并发,同步解决方案在这里会容易得多。
如果您使用的是 tokio,请使用 tokio::spawn inside another Future together with tokio_timer::Timeout 到 运行 检查,然后在事件循环中等待指定的时间。
async/await 示例
新的 async/await 支持使这样的代码更容易编写。
此示例目前仅适用于 nightly
编译器 tokio-0.3.0-alpha.2
和当前 hyper
master 分支:
[dependencies]
tokio = "0.3.0-alpha.2"
tokio-timer = "0.3.0-alpha.2"
hyper = { git = "https://github.com/hyperium/hyper.git" }
use tokio::timer::Interval;
use hyper::{Client, Uri};
use std::time::Duration;
#[tokio::main]
async fn main() {
let client = Client::new();
let second_interval = 120;
let mut interval = Interval::new_interval(Duration::from_secs(second_interval));
let uri = Uri::from_static("http://httpbin.org/ip");
loop {
let res = Client.get(uri.clone()).await.unwrap();
// Do what you need to with the response...
interval.next().await;
}
}
我有一个定期更新缓存数据的服务。每 N 秒它会使用循环(tokio::run(future_update(http_client.clone()))
)触发未来,但它不会返回到未来解决的父函数。循环阻塞,我只得到一次迭代。
当我创建一个新的超 HTTP 客户端而不是传递一个克隆的客户端时,一切正常。它也不起作用 Arc<Client>
。
pub fn trigger_cache_reload(http_client: Arc<Client<HttpConnector, Body>>) {
let load_interval_sec = get_load_interval_sec(conf.load_interval_seconds.clone());
std::thread::spawn(move || loop {
let http_client = http_client.clone();
info!("Woke up");
tokio::run(pipeline(http_client));
info!(
"Pipeline run complete. Huuhh Now I need sleep of {} secs. Sleeping",
load_interval_sec
);
std::thread::sleep(std::time::Duration::from_secs(load_interval_sec));
});
}
fn pipeline(
client: Arc<Client<HttpConnector, Body>>,
) -> Box<dyn Future<Item = (), Error = ()> + Send> {
let res = fetch_message_payload() //return type of this call is Box<dyn Future<Item = (), Error = Error> + Send>
.map_err(Error::from)
.and_then(|_| {
//let client = hyper::Client::builder().max_idle_per_host(1).build_http();
//if i create new client here every time and use it then all working is fine.
refresh_cache(client) //return type of this call is Box<dyn Future<Item = (), Error = Error> + Send>
.map_err(Error::from)
.and_then(|arg| {
debug!("refresh_cache completed");
Ok(arg)
})
});
let res = res.or_else(|e| {
error!("error {:?}", e);
Ok(())
});
Box::new(res)
}
调用 trigger_cache_reload
一次后,我收到 "woke up"
日志消息。在成功完成未来的一段时间后,我还会收到 "refresh_cache completed"
日志消息。我没有收到带或不带 Arc
的 "sleeping"
日志消息。
如果我每次都在未来创建一个新客户端,我就能得到 "sleeping"
日志消息。
tokio::run
每次调用都会创建一个全新的事件循环和线程池(reactor + executor)。这真的不是你想做的。
超级客户端会将其状态绑定到前一个事件循环,如果轮询新事件循环则无法取得进展,因为旧事件循环将在 run
完成后被销毁。这就是为什么新客户端可以工作,但您不能重复使用旧客户端。
这里有两种解决方案:
如果您的应用程序的其余部分不使用 tokio,我将只使用同步 reqwest::Client。如果您不需要大量并发,同步解决方案在这里会容易得多。
如果您使用的是 tokio,请使用 tokio::spawn inside another Future together with tokio_timer::Timeout 到 运行 检查,然后在事件循环中等待指定的时间。
async/await 示例
新的 async/await 支持使这样的代码更容易编写。
此示例目前仅适用于 nightly
编译器 tokio-0.3.0-alpha.2
和当前 hyper
master 分支:
[dependencies]
tokio = "0.3.0-alpha.2"
tokio-timer = "0.3.0-alpha.2"
hyper = { git = "https://github.com/hyperium/hyper.git" }
use tokio::timer::Interval;
use hyper::{Client, Uri};
use std::time::Duration;
#[tokio::main]
async fn main() {
let client = Client::new();
let second_interval = 120;
let mut interval = Interval::new_interval(Duration::from_secs(second_interval));
let uri = Uri::from_static("http://httpbin.org/ip");
loop {
let res = Client.get(uri.clone()).await.unwrap();
// Do what you need to with the response...
interval.next().await;
}
}