在异步环境中使用 crate notify
Using crate notify in async environment
我正在使用 crate 通知来监视文件更改。
我的问题是,我在异步环境中使用它。今天我意识到,我的“通知”线程正在使用 100% CPU...
我需要示例帮助,如何在异步世界中使用此同步箱。
我生成了一个新的异步线程,运行 里面有一个循环。在循环中,我检查是否有新通知可用,然后让出以启用其他异步任务 运行.
而正是在这里,我消耗了 100% 的 CPU。如何正确执行此操作?
在我看来,阻塞 recv() 是不可能的,因为我想对 CTRL-C 做出反应。
我需要睡觉而不是屈服吗?
tokio::select! {
_ = async {
loop {
if let Ok(DebouncedEvent::Create(path)) = watcher_rx.try_recv() {
process_create(path).await;
}
actix_rt::task::yield_now().await;
}
} => {},
_ = actix_rt::signal::ctrl_c() => {},
}
我重构了我的代码并围绕通知箱创建了一个包装器。我整合了@stepan 的想法。
但是在 spawn_blocking 中我不能使用异步函数。因为我在异步环境中需要这个,所以我使用异步通道...
mod mywatcher {
use async_std::path::Path;
use notify::Error;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc::Sender;
use tokio::sync::Mutex;
pub use notify::DebouncedEvent;
pub use notify::RecursiveMode;
pub async fn watcher(tx: Sender<DebouncedEvent>, delay: Duration) -> Result<MyWatcher, Error> {
let (notify_tx, notify_rx) = std::sync::mpsc::channel();
let notify = notify::watcher(notify_tx, delay)?;
tokio::task::spawn_blocking(move || async {
process_changes(Arc::new(Mutex::new(notify_rx)), tx).await
})
.await
.unwrap();
Ok(MyWatcher { notify })
}
async fn process_changes(
notify_rx: Arc<Mutex<std::sync::mpsc::Receiver<DebouncedEvent>>>,
watcher_tx: Sender<DebouncedEvent>,
) {
println!("Process Changes");
loop {
println!("In loop and waiting...");
let rx = notify_rx.lock().await;
let event = rx.recv();
if event.is_err() {
return;
}
let event = event.unwrap();
println!("EVENT {:?}", event);
watcher_tx.send(event).await.unwrap();
}
}
pub struct MyWatcher {
notify: notify::FsEventWatcher,
}
impl MyWatcher {
pub async fn watch<P: AsRef<Path>>(
&mut self,
p: P,
recursive_mode: RecursiveMode,
) -> Result<(), Error> {
use notify::Watcher;
self.notify.watch(p.as_ref(), recursive_mode)
}
pub async fn unwatch<P: AsRef<Path>>(&mut self, p: P) -> Result<(), Error> {
use notify::Watcher;
self.notify.unwatch(p.as_ref())
}
}
}
#[tokio::main]
async fn main() {
use mywatcher::RecursiveMode;
use std::time::Duration;
use tokio::sync::mpsc;
let (watcher_tx, watcher_rx) = mpsc::channel(10);
let mut watcher = mywatcher::watcher(watcher_tx, Duration::from_secs(10))
.await
.expect("create watcher failed");
watcher
.watch(
"/Develope/async-notify",
RecursiveMode::NonRecursive,
)
.await
.expect("watch failed");
for i in 1..30 {
println!("waiting...");
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
watcher
.unwatch("/Develope/async-notify")
.await
.expect("unwatch failed");
}
Cargo.toml 依赖关系:
tokio = { version = "1", features = [
"macros",
"rt-multi-thread",
"sync",
"time",
] }
notify = "4.0.17"
async-std = "1.10.0"
使用单独的通知任务作为可能的解决方案之一。在这种情况下不需要 select
:
tokio::task::spawn_blocking(move || {
loop {
if let Ok(DebouncedEvent::Create(path)) = rx.recv() {
tokio::spawn(process_create(path));
} else {
break;
}
}
});
actix_rt::signal::ctrl_c().await;
如果“process_create”不能在那个地方直接调用(例如,实际应用中有更多数据不允许调用它),另一个要考虑的选项是使用第二个频道(例如 tokio mspc)并通过该频道中继消息。
我正在使用 crate 通知来监视文件更改。 我的问题是,我在异步环境中使用它。今天我意识到,我的“通知”线程正在使用 100% CPU...
我需要示例帮助,如何在异步世界中使用此同步箱。
我生成了一个新的异步线程,运行 里面有一个循环。在循环中,我检查是否有新通知可用,然后让出以启用其他异步任务 运行.
而正是在这里,我消耗了 100% 的 CPU。如何正确执行此操作? 在我看来,阻塞 recv() 是不可能的,因为我想对 CTRL-C 做出反应。 我需要睡觉而不是屈服吗?
tokio::select! {
_ = async {
loop {
if let Ok(DebouncedEvent::Create(path)) = watcher_rx.try_recv() {
process_create(path).await;
}
actix_rt::task::yield_now().await;
}
} => {},
_ = actix_rt::signal::ctrl_c() => {},
}
我重构了我的代码并围绕通知箱创建了一个包装器。我整合了@stepan 的想法。 但是在 spawn_blocking 中我不能使用异步函数。因为我在异步环境中需要这个,所以我使用异步通道...
mod mywatcher {
use async_std::path::Path;
use notify::Error;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc::Sender;
use tokio::sync::Mutex;
pub use notify::DebouncedEvent;
pub use notify::RecursiveMode;
pub async fn watcher(tx: Sender<DebouncedEvent>, delay: Duration) -> Result<MyWatcher, Error> {
let (notify_tx, notify_rx) = std::sync::mpsc::channel();
let notify = notify::watcher(notify_tx, delay)?;
tokio::task::spawn_blocking(move || async {
process_changes(Arc::new(Mutex::new(notify_rx)), tx).await
})
.await
.unwrap();
Ok(MyWatcher { notify })
}
async fn process_changes(
notify_rx: Arc<Mutex<std::sync::mpsc::Receiver<DebouncedEvent>>>,
watcher_tx: Sender<DebouncedEvent>,
) {
println!("Process Changes");
loop {
println!("In loop and waiting...");
let rx = notify_rx.lock().await;
let event = rx.recv();
if event.is_err() {
return;
}
let event = event.unwrap();
println!("EVENT {:?}", event);
watcher_tx.send(event).await.unwrap();
}
}
pub struct MyWatcher {
notify: notify::FsEventWatcher,
}
impl MyWatcher {
pub async fn watch<P: AsRef<Path>>(
&mut self,
p: P,
recursive_mode: RecursiveMode,
) -> Result<(), Error> {
use notify::Watcher;
self.notify.watch(p.as_ref(), recursive_mode)
}
pub async fn unwatch<P: AsRef<Path>>(&mut self, p: P) -> Result<(), Error> {
use notify::Watcher;
self.notify.unwatch(p.as_ref())
}
}
}
#[tokio::main]
async fn main() {
use mywatcher::RecursiveMode;
use std::time::Duration;
use tokio::sync::mpsc;
let (watcher_tx, watcher_rx) = mpsc::channel(10);
let mut watcher = mywatcher::watcher(watcher_tx, Duration::from_secs(10))
.await
.expect("create watcher failed");
watcher
.watch(
"/Develope/async-notify",
RecursiveMode::NonRecursive,
)
.await
.expect("watch failed");
for i in 1..30 {
println!("waiting...");
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
watcher
.unwatch("/Develope/async-notify")
.await
.expect("unwatch failed");
}
Cargo.toml 依赖关系:
tokio = { version = "1", features = [
"macros",
"rt-multi-thread",
"sync",
"time",
] }
notify = "4.0.17"
async-std = "1.10.0"
使用单独的通知任务作为可能的解决方案之一。在这种情况下不需要 select
:
tokio::task::spawn_blocking(move || {
loop {
if let Ok(DebouncedEvent::Create(path)) = rx.recv() {
tokio::spawn(process_create(path));
} else {
break;
}
}
});
actix_rt::signal::ctrl_c().await;
如果“process_create”不能在那个地方直接调用(例如,实际应用中有更多数据不允许调用它),另一个要考虑的选项是使用第二个频道(例如 tokio mspc)并通过该频道中继消息。