在异步环境中使用 crate notify

Using crate notify in async environment

我正在使用 crate 通知来监视文件更改。 我的问题是,我在异步环境中使用它。今天我意识到,我的“通知”线程正在使用 100% CPU...

我需要示例帮助,如何在异步世界中使用此同步箱。

我生成了一个新的异步线程,运行 里面有一个循环。在循环中,我检查是否有新通知可用,然后让出以启用其他异步任务 运行.

而正是在这里,我消耗了 100% 的 CPU。如何正确执行此操作? 在我看来,阻塞 recv() 是不可能的,因为我想对 CTRL-C 做出反应。 我需要睡觉而不是屈服吗?

tokio::select! {
    _ = async {
        loop {
            if let Ok(DebouncedEvent::Create(path)) = watcher_rx.try_recv() {
                process_create(path).await;
            }
            actix_rt::task::yield_now().await;
        }
    } => {},
    _ = actix_rt::signal::ctrl_c() => {},
}

我重构了我的代码并围绕通知箱创建了一个包装器。我整合了@stepan 的想法。 但是在 spawn_blocking 中我不能使用异步函数。因为我在异步环境中需要这个,所以我使用异步通道...

mod mywatcher {
    use async_std::path::Path;
    use notify::Error;
    use std::sync::Arc;
    use std::time::Duration;
    use tokio::sync::mpsc::Sender;
    use tokio::sync::Mutex;

    pub use notify::DebouncedEvent;
    pub use notify::RecursiveMode;

    pub async fn watcher(tx: Sender<DebouncedEvent>, delay: Duration) -> Result<MyWatcher, Error> {
        let (notify_tx, notify_rx) = std::sync::mpsc::channel();
        let notify = notify::watcher(notify_tx, delay)?;

        tokio::task::spawn_blocking(move || async {
            process_changes(Arc::new(Mutex::new(notify_rx)), tx).await
        })
        .await
        .unwrap();

        Ok(MyWatcher { notify })
    }

    async fn process_changes(
        notify_rx: Arc<Mutex<std::sync::mpsc::Receiver<DebouncedEvent>>>,
        watcher_tx: Sender<DebouncedEvent>,
    ) {
        println!("Process Changes");
        loop {
            println!("In loop and waiting...");
            let rx = notify_rx.lock().await;
            let event = rx.recv();
            if event.is_err() {
                return;
            }

            let event = event.unwrap();
            println!("EVENT {:?}", event);
            watcher_tx.send(event).await.unwrap();
        }
    }

    pub struct MyWatcher {
        notify: notify::FsEventWatcher,
    }

    impl MyWatcher {
        pub async fn watch<P: AsRef<Path>>(
            &mut self,
            p: P,
            recursive_mode: RecursiveMode,
        ) -> Result<(), Error> {
            use notify::Watcher;
            self.notify.watch(p.as_ref(), recursive_mode)
        }

        pub async fn unwatch<P: AsRef<Path>>(&mut self, p: P) -> Result<(), Error> {
            use notify::Watcher;
            self.notify.unwatch(p.as_ref())
        }
    }
}

#[tokio::main]
async fn main() {
    use mywatcher::RecursiveMode;
    use std::time::Duration;
    use tokio::sync::mpsc;

    let (watcher_tx, watcher_rx) = mpsc::channel(10);
    let mut watcher = mywatcher::watcher(watcher_tx, Duration::from_secs(10))
        .await
        .expect("create watcher failed");

    watcher
        .watch(
            "/Develope/async-notify",
            RecursiveMode::NonRecursive,
        )
        .await
        .expect("watch failed");

    for i in 1..30 {
        println!("waiting...");
        tokio::time::sleep(std::time::Duration::from_secs(1)).await;
    }

    watcher
        .unwatch("/Develope/async-notify")
        .await
        .expect("unwatch failed");
}

Cargo.toml 依赖关系:

tokio = { version = "1", features = [
    "macros",
    "rt-multi-thread",
    "sync",
    "time",
] }
notify = "4.0.17"
async-std = "1.10.0"

使用单独的通知任务作为可能的解决方案之一。在这种情况下不需要 select

tokio::task::spawn_blocking(move || {
  loop {
    if let Ok(DebouncedEvent::Create(path)) = rx.recv() {
       tokio::spawn(process_create(path));
    } else {
       break;
    }
  }
});

actix_rt::signal::ctrl_c().await;

如果“process_create”不能在那个地方直接调用(例如,实际应用中有更多数据不允许调用它),另一个要考虑的选项是使用第二个频道(例如 tokio mspc)并通过该频道中继消息。