如何有效地实现 Windows 超时的 Path.exists() 版本
How to efficiently implement a version of Path.exists() with a timeout on Windows
我的问题的背景是我在 windows 上使用“Path.exists()”来检查网络路径是否可用。如果不是,“Path.exists()”会在它 returns False
之前阻塞相当长的一段时间(20 秒或更长时间)。
这太长了,所以我想定义一个时间限制,之后我认为资源不可用。
我想到的是围绕“路径”的包装器 class,它使用 ThreadPoolExecutor
调用 Path
s exists
方法并在超时后取消调用:
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
class TimeoutPath:
executor = ThreadPoolExecutor(max_workers=1)
def __init__(self, *args, timeout: float = 1, **kwargs):
self._path = Path(*args, **kwargs)
self.timeout = timeout
def exists(self) -> bool:
future = TimeoutPath.executor.submit(self._path.exists)
start_time = time.time()
while (time.time() - start_time) < self.timeout:
if future.done():
return future.result()
future.cancel()
return False
def get_path(self) -> Path:
return self._path
def __getattr__(self, name: str) -> Any:
return getattr(self._path, name)
def __str__(self) -> str:
return str(self._path)
这有效,但即使在本地文件夹上,即使超时设置为一秒,有时也会 returns False
对于本地驱动器上的现有文件夹,我认为这是由于启动线程所需的开销。
所以我想我的问题是:是否可以通过使用另一个 threading/multiprocessing 实现来减少线程池引入的开销?或者是否有另一种我没有看到的没有线程的可能解决方案?
感谢 @IInspectable 我想我明白了我的错误所在。
在 while 循环中轮询 future.done
方法会产生太多不必要的循环。
Future.result()
实际上已经有一个超时参数并阻塞,直到将来有结果或超时:
from concurrent.futures import ThreadPoolExecutor, TimeoutError
from pathlib import Path
from typing import Any
class TimeoutPath:
executor = ThreadPoolExecutor(max_workers=1)
def __init__(self, *args, timeout: float = 1, **kwargs):
self._path = Path(*args, **kwargs)
self.timeout = timeout
def exists(self) -> bool:
future = TimeoutPath.executor.submit(self._path.exists)
try:
return future.result(self.timeout)
except TimeoutError:
return False
def get_path(self) -> Path:
return self._path
def __getattr__(self, name: str) -> Any:
return getattr(self._path, name)
def __str__(self) -> str:
return str(self._path)
在我的机器上,这对于 2 毫秒的超时非常可靠。
我的问题的背景是我在 windows 上使用“Path.exists()”来检查网络路径是否可用。如果不是,“Path.exists()”会在它 returns False
之前阻塞相当长的一段时间(20 秒或更长时间)。
这太长了,所以我想定义一个时间限制,之后我认为资源不可用。
我想到的是围绕“路径”的包装器 class,它使用 ThreadPoolExecutor
调用 Path
s exists
方法并在超时后取消调用:
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
class TimeoutPath:
executor = ThreadPoolExecutor(max_workers=1)
def __init__(self, *args, timeout: float = 1, **kwargs):
self._path = Path(*args, **kwargs)
self.timeout = timeout
def exists(self) -> bool:
future = TimeoutPath.executor.submit(self._path.exists)
start_time = time.time()
while (time.time() - start_time) < self.timeout:
if future.done():
return future.result()
future.cancel()
return False
def get_path(self) -> Path:
return self._path
def __getattr__(self, name: str) -> Any:
return getattr(self._path, name)
def __str__(self) -> str:
return str(self._path)
这有效,但即使在本地文件夹上,即使超时设置为一秒,有时也会 returns False
对于本地驱动器上的现有文件夹,我认为这是由于启动线程所需的开销。
所以我想我的问题是:是否可以通过使用另一个 threading/multiprocessing 实现来减少线程池引入的开销?或者是否有另一种我没有看到的没有线程的可能解决方案?
感谢 @IInspectable 我想我明白了我的错误所在。
在 while 循环中轮询 future.done
方法会产生太多不必要的循环。
Future.result()
实际上已经有一个超时参数并阻塞,直到将来有结果或超时:
from concurrent.futures import ThreadPoolExecutor, TimeoutError
from pathlib import Path
from typing import Any
class TimeoutPath:
executor = ThreadPoolExecutor(max_workers=1)
def __init__(self, *args, timeout: float = 1, **kwargs):
self._path = Path(*args, **kwargs)
self.timeout = timeout
def exists(self) -> bool:
future = TimeoutPath.executor.submit(self._path.exists)
try:
return future.result(self.timeout)
except TimeoutError:
return False
def get_path(self) -> Path:
return self._path
def __getattr__(self, name: str) -> Any:
return getattr(self._path, name)
def __str__(self) -> str:
return str(self._path)
在我的机器上,这对于 2 毫秒的超时非常可靠。