如何在不收到错误 "Cannot start a runtime from within a runtime" 的情况下在另一个 Tokio 运行时内创建 Tokio 运行时?
How can I create a Tokio runtime inside another Tokio runtime without getting the error "Cannot start a runtime from within a runtime"?
我正在使用 rust_bert
for summarising text. I need to set a model with rust_bert::pipelines::summarization::SummarizationModel::new
, which fetches the model from the internet. It does this asynchronously using tokio
并且(我认为)我 运行 遇到的问题是我 运行 另一个 Tokio 运行时中的 Tokio 运行时,如图所示通过错误消息:
Downloading https://cdn.huggingface.co/facebook/bart-large-cnn/config.json to "/home/(censored)/.cache/.rustbert/bart-cnn/config.json"
thread 'main' panicked at 'Cannot start a runtime from within a runtime. This happens because a function (like `block_on`) attempted to block the current thread while the thread is being used to drive asynchronous tasks.', /home/(censored)/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.21/src/runtime/enter.rs:38:5
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
我试过 运行 模型同步获取
tokio::task::spawn_blocking
tokio::task::block_in_place
但他们都不为我工作。 block_in_place
给出了与不存在相同的错误,并且 spawn_blocking
并不 真的 似乎对我有用。
我也试过使 summarize_text
异步,但这并没有多大帮助。 Github 问题
tokio-rs/tokio#2194
和 Reddit post
"'Cannot start a runtime from within a runtime.' with Actix-Web And Postgresql"
看起来很相似(相同的错误消息),但它们对找到解决方案没有多大帮助。
我遇到问题的代码如下:
use egg_mode::tweet;
use rust_bert::pipelines::summarization::SummarizationModel;
fn summarize_text(model: SummarizationModel, text: &str) -> String {
let output = model.summarize(&[text]);
// @TODO: output summarization
match output.is_empty() {
false => "FALSE".to_string(),
true => "TRUE".to_string(),
}
}
#[tokio::main]
async fn main() {
let model = SummarizationModel::new(Default::default()).unwrap();
let token = egg_mode::auth::Token::Bearer("obviously not my token".to_string());
let tweet_id = 1221552460768202756; // example tweet
println!("Loading tweet [{id}]", id = tweet_id);
let status = tweet::show(tweet_id, &token).await;
match status {
Err(err) => println!("Failed to fetch tweet: {}", err),
Ok(tweet) => {
println!(
"Original tweet:\n{orig}\n\nSummarized tweet:\n{sum}",
orig = tweet.text,
sum = summarize_text(model, &tweet.text)
);
}
}
}
解决问题
这是一个简化的例子:
use tokio; // 1.0.2
#[tokio::main]
async fn inner_example() {}
#[tokio::main]
async fn main() {
inner_example();
}
thread 'main' panicked at 'Cannot start a runtime from within a runtime. This happens because a function (like `block_on`) attempted to block the current thread while the thread is being used to drive asynchronous tasks.', /playground/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.0.2/src/runtime/enter.rs:39:9
为避免这种情况,您需要 运行 在完全独立的线程上创建第二个 Tokio 运行time 的代码。最简单的方法是使用 std::thread::spawn
:
use std::thread;
#[tokio::main]
async fn inner_example() {}
#[tokio::main]
async fn main() {
thread::spawn(|| {
inner_example();
}).join().expect("Thread panicked")
}
为了提高性能,您可能希望使用线程池而不是每次都创建一个新线程。方便的是,Tokio 本身通过 spawn_blocking
:
提供了这样一个线程池
#[tokio::main]
async fn inner_example() {}
#[tokio::main]
async fn main() {
tokio::task::spawn_blocking(|| {
inner_example();
}).await.expect("Task panicked")
}
在某些情况下,您不需要实际创建第二个 Tokio 运行time,而是可以重复使用父 运行time。为此,如果需要等待工作完成,则传入 Handle
to the outer runtime. You can optionally use a lightweight executor like futures::executor
以阻止结果:
use tokio::runtime::Handle; // 1.0.2
fn inner_example(handle: Handle) {
futures::executor::block_on(async {
handle
.spawn(async {
// Do work here
})
.await
.expect("Task spawned in Tokio executor panicked")
})
}
#[tokio::main]
async fn main() {
let handle = Handle::current();
tokio::task::spawn_blocking(|| {
inner_example(handle);
})
.await
.expect("Blocking task panicked")
}
另请参阅:
- How to create a dedicated threadpool for CPU-intensive work in Tokio?
避免问题
更好的方法是首先避免创建嵌套的 Tokio 运行 次。理想情况下,如果一个库使用异步执行器,它也会提供直接异步功能,这样您就可以使用自己的执行器。
值得一看 API 看看是否有非阻塞替代方案,如果没有,请在项目存储库中提出问题。
您还可以重新组织您的代码,使 Tokio 运行 时间不是嵌套而是连续的:
struct Data;
#[tokio::main]
async fn inner_example() -> Data {
Data
}
#[tokio::main]
async fn core(_: Data) {}
fn main() {
let data = inner_example();
core(data);
}
我在将 QA 模型加载到 warp
(tokio 运行时)时遇到了类似的问题,顺序运行时仍然不适合我,但我在 github issues of rust-bert.解决方案只是将初始加载调用包装在 task::spawn_blocking
中。这对我来说很好,因为无论如何我都无法在加载之前接受任何请求。下面的代码片段以防对其他人有所帮助。
78 fn with_model(
79 qa: QaModel, // alias for Arc<Mutex<QuestionAnsweringModel>>
80 ) -> impl Filter<Extract = (QaModel,), Error = std::convert::Infallible> + Clone {
81 warp::any().map(move || qa.clone())
82 }
83
84 #[tokio::main]
85 async fn main() {
86 env_logger::init();
87
88 // NOTE: have to download the model before booting up
>> 89 let qa_model: QaModel = task::spawn_blocking(move || {
90 log::debug!("setting up qa model config");
91 let c = qa_model_config();
92 log::debug!("finished setting up qa model config");
93
94 log::debug!("setting up qa model");
95 let m = qa_model(c);
96 log::debug!("finished setting up qa model");
97 m
98 })
99 .await
100 .expect("got model");
101
102 let ask_handler = warp::path!("ask")
103 .and(warp::get())
104 .and(warp::query::<QaQuery>())
105 .and(with_model(qa_model))
106 .and_then(ask);
107
108 warp::serve(ask_handler).run(([127, 0, 0, 1], 3030)).await;
109 }
我正在使用 rust_bert
for summarising text. I need to set a model with rust_bert::pipelines::summarization::SummarizationModel::new
, which fetches the model from the internet. It does this asynchronously using tokio
并且(我认为)我 运行 遇到的问题是我 运行 另一个 Tokio 运行时中的 Tokio 运行时,如图所示通过错误消息:
Downloading https://cdn.huggingface.co/facebook/bart-large-cnn/config.json to "/home/(censored)/.cache/.rustbert/bart-cnn/config.json"
thread 'main' panicked at 'Cannot start a runtime from within a runtime. This happens because a function (like `block_on`) attempted to block the current thread while the thread is being used to drive asynchronous tasks.', /home/(censored)/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.21/src/runtime/enter.rs:38:5
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
我试过 运行 模型同步获取
tokio::task::spawn_blocking
tokio::task::block_in_place
但他们都不为我工作。 block_in_place
给出了与不存在相同的错误,并且 spawn_blocking
并不 真的 似乎对我有用。
我也试过使 summarize_text
异步,但这并没有多大帮助。 Github 问题
tokio-rs/tokio#2194
和 Reddit post
"'Cannot start a runtime from within a runtime.' with Actix-Web And Postgresql"
看起来很相似(相同的错误消息),但它们对找到解决方案没有多大帮助。
我遇到问题的代码如下:
use egg_mode::tweet;
use rust_bert::pipelines::summarization::SummarizationModel;
fn summarize_text(model: SummarizationModel, text: &str) -> String {
let output = model.summarize(&[text]);
// @TODO: output summarization
match output.is_empty() {
false => "FALSE".to_string(),
true => "TRUE".to_string(),
}
}
#[tokio::main]
async fn main() {
let model = SummarizationModel::new(Default::default()).unwrap();
let token = egg_mode::auth::Token::Bearer("obviously not my token".to_string());
let tweet_id = 1221552460768202756; // example tweet
println!("Loading tweet [{id}]", id = tweet_id);
let status = tweet::show(tweet_id, &token).await;
match status {
Err(err) => println!("Failed to fetch tweet: {}", err),
Ok(tweet) => {
println!(
"Original tweet:\n{orig}\n\nSummarized tweet:\n{sum}",
orig = tweet.text,
sum = summarize_text(model, &tweet.text)
);
}
}
}
解决问题
这是一个简化的例子:
use tokio; // 1.0.2
#[tokio::main]
async fn inner_example() {}
#[tokio::main]
async fn main() {
inner_example();
}
thread 'main' panicked at 'Cannot start a runtime from within a runtime. This happens because a function (like `block_on`) attempted to block the current thread while the thread is being used to drive asynchronous tasks.', /playground/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.0.2/src/runtime/enter.rs:39:9
为避免这种情况,您需要 运行 在完全独立的线程上创建第二个 Tokio 运行time 的代码。最简单的方法是使用 std::thread::spawn
:
use std::thread;
#[tokio::main]
async fn inner_example() {}
#[tokio::main]
async fn main() {
thread::spawn(|| {
inner_example();
}).join().expect("Thread panicked")
}
为了提高性能,您可能希望使用线程池而不是每次都创建一个新线程。方便的是,Tokio 本身通过 spawn_blocking
:
#[tokio::main]
async fn inner_example() {}
#[tokio::main]
async fn main() {
tokio::task::spawn_blocking(|| {
inner_example();
}).await.expect("Task panicked")
}
在某些情况下,您不需要实际创建第二个 Tokio 运行time,而是可以重复使用父 运行time。为此,如果需要等待工作完成,则传入 Handle
to the outer runtime. You can optionally use a lightweight executor like futures::executor
以阻止结果:
use tokio::runtime::Handle; // 1.0.2
fn inner_example(handle: Handle) {
futures::executor::block_on(async {
handle
.spawn(async {
// Do work here
})
.await
.expect("Task spawned in Tokio executor panicked")
})
}
#[tokio::main]
async fn main() {
let handle = Handle::current();
tokio::task::spawn_blocking(|| {
inner_example(handle);
})
.await
.expect("Blocking task panicked")
}
另请参阅:
- How to create a dedicated threadpool for CPU-intensive work in Tokio?
避免问题
更好的方法是首先避免创建嵌套的 Tokio 运行 次。理想情况下,如果一个库使用异步执行器,它也会提供直接异步功能,这样您就可以使用自己的执行器。
值得一看 API 看看是否有非阻塞替代方案,如果没有,请在项目存储库中提出问题。
您还可以重新组织您的代码,使 Tokio 运行 时间不是嵌套而是连续的:
struct Data;
#[tokio::main]
async fn inner_example() -> Data {
Data
}
#[tokio::main]
async fn core(_: Data) {}
fn main() {
let data = inner_example();
core(data);
}
我在将 QA 模型加载到 warp
(tokio 运行时)时遇到了类似的问题,顺序运行时仍然不适合我,但我在 github issues of rust-bert.解决方案只是将初始加载调用包装在 task::spawn_blocking
中。这对我来说很好,因为无论如何我都无法在加载之前接受任何请求。下面的代码片段以防对其他人有所帮助。
78 fn with_model(
79 qa: QaModel, // alias for Arc<Mutex<QuestionAnsweringModel>>
80 ) -> impl Filter<Extract = (QaModel,), Error = std::convert::Infallible> + Clone {
81 warp::any().map(move || qa.clone())
82 }
83
84 #[tokio::main]
85 async fn main() {
86 env_logger::init();
87
88 // NOTE: have to download the model before booting up
>> 89 let qa_model: QaModel = task::spawn_blocking(move || {
90 log::debug!("setting up qa model config");
91 let c = qa_model_config();
92 log::debug!("finished setting up qa model config");
93
94 log::debug!("setting up qa model");
95 let m = qa_model(c);
96 log::debug!("finished setting up qa model");
97 m
98 })
99 .await
100 .expect("got model");
101
102 let ask_handler = warp::path!("ask")
103 .and(warp::get())
104 .and(warp::query::<QaQuery>())
105 .and(with_model(qa_model))
106 .and_then(ask);
107
108 warp::serve(ask_handler).run(([127, 0, 0, 1], 3030)).await;
109 }