asyncio中如何使用kqueue进行文件监控?
How to use kqueue for file monitoring in asyncio?
我想使用 kqueue 来监视文件的更改。我可以看到如何以线程方式使用 select.kqueue()。
我正在寻找一种将它与 asyncio 一起使用的方法。我可能在这里错过了一些非常明显的东西。我知道 python 使用 kqueue for asyncio on macos。我很高兴任何解决方案只能在使用 kqueue 选择器时起作用。
到目前为止,我能看到的唯一方法是创建一个线程以从另一个线程持续 kqueue.control()
,然后使用 asyncio.loop.call_soon_threadsafe()
注入事件。我觉得应该有更好的方法。
您可以使用 loop.add_reader() 将来自 kqueue 对象的 FD 作为 reader 添加到控制循环中。然后控制循环会通知您事件已准备好收集。
这样做有两个特点对于熟悉 kqueue 的人来说可能很奇怪:
- select.kqueue.control 是一个 one-shot 方法,它首先更改监视器并等待新事件到达。因为我们不希望它阻塞,所以必须将这两个操作拆分为一个 non-blocking 调用来修改监视器,然后再调用 non-blocking 来收集结果事件。
- 因为我们永远不想阻塞,所以永远不能使用超时。这可以是 re-implemented 和
asyncio.wait_for()
有更有效的方法来写这个,但这里有一个如何用异步方法(这里命名为 kqueue_control
)完全替换 select.kqueue.control
的示例:
async def kqueue_control(kqueue: select.kqueue,
changes: Optional[Iterable[select.kevent]],
max_events: int,
timeout: Optional[int]):
def receive_result():
try:
# Events are ready to collect; fetch them but do not block
results = kqueue.control(None, max_events, 0)
except Exception as ex:
future.set_exception(ex)
else:
future.set_result(results)
finally:
loop.remove_reader(kqueue.fileno())
# If this call is non-blocking then just execute it
if timeout == 0 or max_events == 0:
return kqueue.control(changes, max_events, 0)
# Apply the changes, but DON'T wait for events
kqueue.control(changes, 0)
loop = asyncio.get_running_loop()
future = loop.create_future()
loop.add_reader(kqueue.fileno(), receive_result)
if timeout is None:
return await future
else:
return await asyncio.wait_for(future, timeout)
我想使用 kqueue 来监视文件的更改。我可以看到如何以线程方式使用 select.kqueue()。
我正在寻找一种将它与 asyncio 一起使用的方法。我可能在这里错过了一些非常明显的东西。我知道 python 使用 kqueue for asyncio on macos。我很高兴任何解决方案只能在使用 kqueue 选择器时起作用。
到目前为止,我能看到的唯一方法是创建一个线程以从另一个线程持续 kqueue.control()
,然后使用 asyncio.loop.call_soon_threadsafe()
注入事件。我觉得应该有更好的方法。
您可以使用 loop.add_reader() 将来自 kqueue 对象的 FD 作为 reader 添加到控制循环中。然后控制循环会通知您事件已准备好收集。
这样做有两个特点对于熟悉 kqueue 的人来说可能很奇怪:
- select.kqueue.control 是一个 one-shot 方法,它首先更改监视器并等待新事件到达。因为我们不希望它阻塞,所以必须将这两个操作拆分为一个 non-blocking 调用来修改监视器,然后再调用 non-blocking 来收集结果事件。
- 因为我们永远不想阻塞,所以永远不能使用超时。这可以是 re-implemented 和
asyncio.wait_for()
有更有效的方法来写这个,但这里有一个如何用异步方法(这里命名为 kqueue_control
)完全替换 select.kqueue.control
的示例:
async def kqueue_control(kqueue: select.kqueue,
changes: Optional[Iterable[select.kevent]],
max_events: int,
timeout: Optional[int]):
def receive_result():
try:
# Events are ready to collect; fetch them but do not block
results = kqueue.control(None, max_events, 0)
except Exception as ex:
future.set_exception(ex)
else:
future.set_result(results)
finally:
loop.remove_reader(kqueue.fileno())
# If this call is non-blocking then just execute it
if timeout == 0 or max_events == 0:
return kqueue.control(changes, max_events, 0)
# Apply the changes, but DON'T wait for events
kqueue.control(changes, 0)
loop = asyncio.get_running_loop()
future = loop.create_future()
loop.add_reader(kqueue.fileno(), receive_result)
if timeout is None:
return await future
else:
return await asyncio.wait_for(future, timeout)