如何在 python 中创建可连接的信号量?
How to create a joinable semaphore in python?
在某些用例中,我需要等待所有已创建的线程完成并根据它们的结果做出一些决定,看看我们是否需要进一步移动 - 没有 ThreadPoolExecutor.shutdown()
.
我是这样实现的:
from threading import BoundedSemaphore, Event
class JoinSemaphore(BoundedSemaphore):
def __init__(self, value=1):
super().__init__(value)
self._empty = Event()
def join(self, timeout=None):
if self._value < self._initial_value:
self._empty.wait(timeout)
def release(self):
with self._cond:
if self._value >= self._initial_value:
raise ValueError("Semaphore released too many times")
elif self._value == self._initial_value - 1:
self._empty.set()
self._value += 1
self._cond.notify()
def acquired(self):
with self._cond:
return self._initial_value - self._value
我在这里计算 self._value < self._initial_value
没有任何守卫,它有风险。
当我像下面这样编写 join() 函数以防止在计算时进行不必要的更改 self._value < self._initial_value
当主线程加入信号量时我将面临死锁,此时其他线程不获取 release()
锁定,因为主线程已经累积了它,但主线程仍在等待其他线程。所以这个实现是不正确的。
def join(self, timeout=None):
with self._cond:
if self._value < self._initial_value:
self._empty.wait(timeout)
在第三个实现中,我不能保证当我想 wait()
事件 _empty
时,其他线程不可能提交它们的结果并释放锁。
def join(self, timeout=None):
with self._cond:
if self._value == self._initial_value:
return
self._empty.wait(timeout)
问题是:
我如何使用锁正确计算 self._value < self._initial_value
并等待 _empty
事件并在等待它之前释放锁以避免死锁?
非常感谢
这是我找到的解决这个问题的方法,我想继承基本信号量 class 但找不到解决问题的简单解决方案,所以我决定编辑基本 BoundSemaphore class:
from threading import Event, Condition, Lock
from time import monotonic as _time
class JoinSemaphore:
def __init__(self, value=1):
if value < 0:
raise ValueError("semaphore initial value must be >= 0")
self._cond = Condition(Lock())
self._value = value
self._initial_value = value
self._empty = Event()
self._empty.set()
def acquire(self, blocking=True, timeout=None):
if not blocking and timeout is not None:
raise ValueError("can't specify timeout for non-blocking acquire")
rc = False
endtime = None
with self._cond:
while self._value == 0:
if not blocking:
break
if timeout is not None:
if endtime is None:
endtime = _time() + timeout
else:
timeout = endtime - _time()
if timeout <= 0:
break
self._cond.wait(timeout)
else:
self._empty.clear()
self._value -= 1
rc = True
return rc
__enter__ = acquire
def join(self, timeout=None):
self._empty.wait(timeout)
def release(self):
with self._cond:
if self._value >= self._initial_value:
raise ValueError("Semaphore released too many times")
elif self._value == self._initial_value - 1:
self._empty.set()
self._value += 1
self._cond.notify()
def acquired(self):
with self._cond:
return self._initial_value - self._value
差异是:
class JoinSemaphore:
def __init__(self, value=1):
...
self._empty = Event()
self._empty.set()
def acquire(self, blocking=True, timeout=None):
...
with self._cond:
while ...:
...
else:
self._empty.clear()
...
def join(self, timeout=None):
self._empty.wait(timeout)
def release(self):
...
elif self._value == self._initial_value - 1:
self._empty.set()
...
def acquired(self):
with self._cond:
return self._initial_value - self._value
在某些用例中,我需要等待所有已创建的线程完成并根据它们的结果做出一些决定,看看我们是否需要进一步移动 - 没有 ThreadPoolExecutor.shutdown()
.
我是这样实现的:
from threading import BoundedSemaphore, Event
class JoinSemaphore(BoundedSemaphore):
def __init__(self, value=1):
super().__init__(value)
self._empty = Event()
def join(self, timeout=None):
if self._value < self._initial_value:
self._empty.wait(timeout)
def release(self):
with self._cond:
if self._value >= self._initial_value:
raise ValueError("Semaphore released too many times")
elif self._value == self._initial_value - 1:
self._empty.set()
self._value += 1
self._cond.notify()
def acquired(self):
with self._cond:
return self._initial_value - self._value
我在这里计算 self._value < self._initial_value
没有任何守卫,它有风险。
当我像下面这样编写 join() 函数以防止在计算时进行不必要的更改 self._value < self._initial_value
当主线程加入信号量时我将面临死锁,此时其他线程不获取 release()
锁定,因为主线程已经累积了它,但主线程仍在等待其他线程。所以这个实现是不正确的。
def join(self, timeout=None):
with self._cond:
if self._value < self._initial_value:
self._empty.wait(timeout)
在第三个实现中,我不能保证当我想 wait()
事件 _empty
时,其他线程不可能提交它们的结果并释放锁。
def join(self, timeout=None):
with self._cond:
if self._value == self._initial_value:
return
self._empty.wait(timeout)
问题是:
我如何使用锁正确计算 self._value < self._initial_value
并等待 _empty
事件并在等待它之前释放锁以避免死锁?
非常感谢
这是我找到的解决这个问题的方法,我想继承基本信号量 class 但找不到解决问题的简单解决方案,所以我决定编辑基本 BoundSemaphore class:
from threading import Event, Condition, Lock
from time import monotonic as _time
class JoinSemaphore:
def __init__(self, value=1):
if value < 0:
raise ValueError("semaphore initial value must be >= 0")
self._cond = Condition(Lock())
self._value = value
self._initial_value = value
self._empty = Event()
self._empty.set()
def acquire(self, blocking=True, timeout=None):
if not blocking and timeout is not None:
raise ValueError("can't specify timeout for non-blocking acquire")
rc = False
endtime = None
with self._cond:
while self._value == 0:
if not blocking:
break
if timeout is not None:
if endtime is None:
endtime = _time() + timeout
else:
timeout = endtime - _time()
if timeout <= 0:
break
self._cond.wait(timeout)
else:
self._empty.clear()
self._value -= 1
rc = True
return rc
__enter__ = acquire
def join(self, timeout=None):
self._empty.wait(timeout)
def release(self):
with self._cond:
if self._value >= self._initial_value:
raise ValueError("Semaphore released too many times")
elif self._value == self._initial_value - 1:
self._empty.set()
self._value += 1
self._cond.notify()
def acquired(self):
with self._cond:
return self._initial_value - self._value
差异是:
class JoinSemaphore:
def __init__(self, value=1):
...
self._empty = Event()
self._empty.set()
def acquire(self, blocking=True, timeout=None):
...
with self._cond:
while ...:
...
else:
self._empty.clear()
...
def join(self, timeout=None):
self._empty.wait(timeout)
def release(self):
...
elif self._value == self._initial_value - 1:
self._empty.set()
...
def acquired(self):
with self._cond:
return self._initial_value - self._value