"cannot recursively call into `Core`" 尝试使用 Tokio 实现嵌套并发时
"cannot recursively call into `Core`" when trying to achieve nested concurrency using Tokio
我正在构建一个定期发出 HTTP 请求的服务。我正在使用 tokio::timer::Delay
作为定期触发器和 hyper 来进行 HTTP 调用。
同时使用它们会出现以下错误:
thread 'tokio-runtime-worker-1' panicked at 'cannot recursively call into `Core`', libcore/option.rs:960:5
我怎样才能完成这项工作?
以下是该服务的简化版本。
main.rs
extern crate futures;
extern crate hyper;
extern crate tokio;
extern crate tokio_core;
extern crate tokio_timer;
use futures::{Future, Stream};
use hyper::Client;
use tokio_core::reactor::Core;
use std::time::{Duration, Instant};
use tokio::timer::Delay;
use std::io::{self, Write};
fn main() {
let when = Instant::now() + Duration::from_secs(1);
tokio::run({
Delay::new(when)
.map_err(|e| panic!("timer failed; err={:?}", e))
.and_then(move |_| {
let mut core = Core::new().unwrap();
let client = Client::new(&core.handle());
let uri = "http://httpbin.org/ip".parse().unwrap();
let work = client.get(uri).and_then(|res| {
println!("Response: {}", res.status());
res.body()
.for_each(|chunk| io::stdout().write_all(&chunk).map_err(From::from))
});
core.run(work).unwrap();
Ok(())
})
})
}
Cargo.toml
[dependencies]
futures = "0.1"
hyper = "0.11"
tokio-core = "0.1"
tokio-timer = "0.1"
tokio = "0.1"
serde = "1.0.19"
serde_derive = "1.0.19"
serde_json = "1.0.19"
hyper-tls = "0.1.3"
我看到的一个主要概念问题是您不应该创建任意 Core
s。你想尽可能多地分享这些,因为这就是 Tokio 在不同期货之间进行交流的方式。
创建单个核心并将其用于 HTTP 请求和整个命令是正确的做法。
超级 0.11
hyper 0.11 is not compatible with the tokio crate。相反,您需要使用 Tokio 的组件:
extern crate futures;
extern crate hyper;
extern crate tokio_core;
extern crate tokio_timer;
use futures::{Future, Stream};
use hyper::Client;
use std::{
io::{self, Write}, time::{Duration, Instant},
};
use tokio_core::reactor::Core;
use tokio_timer::Delay;
fn main() {
let when = Instant::now() + Duration::from_secs(1);
let mut core = Core::new().expect("Could not achieve criticality");
let handle = core.handle();
let command = Delay::new(when)
.map_err(|e| panic!("timer failed; err={:?}", e))
.and_then(move |_| {
let client = Client::new(&handle);
let uri = "http://httpbin.org/ip".parse().unwrap();
client.get(uri).and_then(|res| {
println!("Response: {}", res.status());
res.body()
.for_each(|chunk| io::stdout().write_all(&chunk).map_err(From::from))
})
});
core.run(command).expect("Meltdown occurred");
}
[dependencies]
futures = "0.1"
hyper = "0.11.27"
tokio-core = "0.1.17"
tokio-timer = "0.2.3"
超级 0.12
使用 hyper 0.12,它看起来像这样:
extern crate hyper;
extern crate tokio;
use hyper::Client;
use std::{
error::Error, io::{self, Write}, time::{Duration, Instant},
};
use tokio::{
prelude::{Future, Stream}, timer::Delay,
};
type MyError = Box<Error + Send + Sync>;
fn main() {
let when = Instant::now() + Duration::from_secs(1);
let command = Delay::new(when).from_err::<MyError>().and_then(move |_| {
let client = Client::new();
let uri = "http://httpbin.org/ip".parse().unwrap();
client.get(uri).from_err::<MyError>().and_then(|res| {
println!("Response: {}", res.status());
res.into_body()
.from_err::<MyError>()
.for_each(|chunk| io::stdout().write_all(&chunk).map_err(From::from))
})
});
tokio::run(command.map_err(|e| panic!("Error: {}", e)));
}
[dependencies]
hyper = "0.12.0"
tokio = "0.1.6"
我正在构建一个定期发出 HTTP 请求的服务。我正在使用 tokio::timer::Delay
作为定期触发器和 hyper 来进行 HTTP 调用。
同时使用它们会出现以下错误:
thread 'tokio-runtime-worker-1' panicked at 'cannot recursively call into `Core`', libcore/option.rs:960:5
我怎样才能完成这项工作?
以下是该服务的简化版本。
main.rs
extern crate futures;
extern crate hyper;
extern crate tokio;
extern crate tokio_core;
extern crate tokio_timer;
use futures::{Future, Stream};
use hyper::Client;
use tokio_core::reactor::Core;
use std::time::{Duration, Instant};
use tokio::timer::Delay;
use std::io::{self, Write};
fn main() {
let when = Instant::now() + Duration::from_secs(1);
tokio::run({
Delay::new(when)
.map_err(|e| panic!("timer failed; err={:?}", e))
.and_then(move |_| {
let mut core = Core::new().unwrap();
let client = Client::new(&core.handle());
let uri = "http://httpbin.org/ip".parse().unwrap();
let work = client.get(uri).and_then(|res| {
println!("Response: {}", res.status());
res.body()
.for_each(|chunk| io::stdout().write_all(&chunk).map_err(From::from))
});
core.run(work).unwrap();
Ok(())
})
})
}
Cargo.toml
[dependencies]
futures = "0.1"
hyper = "0.11"
tokio-core = "0.1"
tokio-timer = "0.1"
tokio = "0.1"
serde = "1.0.19"
serde_derive = "1.0.19"
serde_json = "1.0.19"
hyper-tls = "0.1.3"
我看到的一个主要概念问题是您不应该创建任意 Core
s。你想尽可能多地分享这些,因为这就是 Tokio 在不同期货之间进行交流的方式。
创建单个核心并将其用于 HTTP 请求和整个命令是正确的做法。
超级 0.11
hyper 0.11 is not compatible with the tokio crate。相反,您需要使用 Tokio 的组件:
extern crate futures;
extern crate hyper;
extern crate tokio_core;
extern crate tokio_timer;
use futures::{Future, Stream};
use hyper::Client;
use std::{
io::{self, Write}, time::{Duration, Instant},
};
use tokio_core::reactor::Core;
use tokio_timer::Delay;
fn main() {
let when = Instant::now() + Duration::from_secs(1);
let mut core = Core::new().expect("Could not achieve criticality");
let handle = core.handle();
let command = Delay::new(when)
.map_err(|e| panic!("timer failed; err={:?}", e))
.and_then(move |_| {
let client = Client::new(&handle);
let uri = "http://httpbin.org/ip".parse().unwrap();
client.get(uri).and_then(|res| {
println!("Response: {}", res.status());
res.body()
.for_each(|chunk| io::stdout().write_all(&chunk).map_err(From::from))
})
});
core.run(command).expect("Meltdown occurred");
}
[dependencies]
futures = "0.1"
hyper = "0.11.27"
tokio-core = "0.1.17"
tokio-timer = "0.2.3"
超级 0.12
使用 hyper 0.12,它看起来像这样:
extern crate hyper;
extern crate tokio;
use hyper::Client;
use std::{
error::Error, io::{self, Write}, time::{Duration, Instant},
};
use tokio::{
prelude::{Future, Stream}, timer::Delay,
};
type MyError = Box<Error + Send + Sync>;
fn main() {
let when = Instant::now() + Duration::from_secs(1);
let command = Delay::new(when).from_err::<MyError>().and_then(move |_| {
let client = Client::new();
let uri = "http://httpbin.org/ip".parse().unwrap();
client.get(uri).from_err::<MyError>().and_then(|res| {
println!("Response: {}", res.status());
res.into_body()
.from_err::<MyError>()
.for_each(|chunk| io::stdout().write_all(&chunk).map_err(From::from))
})
});
tokio::run(command.map_err(|e| panic!("Error: {}", e)));
}
[dependencies]
hyper = "0.12.0"
tokio = "0.1.6"