asyncio中如何使用kqueue进行文件监控?

How to use kqueue for file monitoring in asyncio?

我想使用 kqueue 来监视文件的更改。我可以看到如何以线程方式使用 select.kqueue()。

我正在寻找一种将它与 asyncio 一起使用的方法。我可能在这里错过了一些非常明显的东西。我知道 python 使用 kqueue for asyncio on macos。我很高兴任何解决方案只能在使用 kqueue 选择器时起作用。

到目前为止,我能看到的唯一方法是创建一个线程以从另一个线程持续 kqueue.control(),然后使用 asyncio.loop.call_soon_threadsafe() 注入事件。我觉得应该有更好的方法。

您可以使用 loop.add_reader() 将来自 kqueue 对象的 FD 作为 reader 添加到控制循环中。然后控制循环会通知您事件已准备好收集。

这样做有两个特点对于熟悉 kqueue 的人来说可能很奇怪:

  • select.kqueue.control 是一个 one-shot 方法,它首先更改监视器并等待新事件到达。因为我们不希望它阻塞,所以必须将这两个操作拆分为一个 non-blocking 调用来修改监视器,然后再调用 non-blocking 来收集结果事件。
  • 因为我们永远不想阻塞,所以永远不能使用超时。这可以是 re-implemented 和 asyncio.wait_for()

有更有效的方法来写这个,但这里有一个如何用异步方法(这里命名为 kqueue_control)完全替换 select.kqueue.control 的示例:

async def kqueue_control(kqueue: select.kqueue,
                         changes: Optional[Iterable[select.kevent]],
                         max_events: int,
                         timeout: Optional[int]):

    def receive_result():
        try:
            # Events are ready to collect; fetch them but do not block
            results = kqueue.control(None, max_events, 0)
        except Exception as ex:
            future.set_exception(ex)
        else:
            future.set_result(results)
        finally:
            loop.remove_reader(kqueue.fileno())
            
    # If this call is non-blocking then just execute it
    if timeout == 0 or max_events == 0:
        return kqueue.control(changes, max_events, 0)
    
    # Apply the changes, but DON'T wait for events
    kqueue.control(changes, 0)
    loop = asyncio.get_running_loop()
    future = loop.create_future()
    loop.add_reader(kqueue.fileno(), receive_result)
    if timeout is None:
        return await future
    else:
        return await asyncio.wait_for(future, timeout)