如何在 python 中创建可连接的信号量?

How to create a joinable semaphore in python?

在某些用例中,我需要等待所有已创建的线程完成并根据它们的结果做出一些决定,看看我们是否需要进一步移动 - 没有 ThreadPoolExecutor.shutdown().

我是这样实现的:

from threading import BoundedSemaphore, Event


class JoinSemaphore(BoundedSemaphore):
    def __init__(self, value=1):
        super().__init__(value)
        self._empty = Event()

    def join(self, timeout=None):
        if self._value < self._initial_value:
            self._empty.wait(timeout)

    def release(self):
        with self._cond:
            if self._value >= self._initial_value:
                raise ValueError("Semaphore released too many times")
            elif self._value == self._initial_value - 1:
                self._empty.set()

            self._value += 1
            self._cond.notify()

    def acquired(self):
        with self._cond:
            return self._initial_value - self._value

我在这里计算 self._value < self._initial_value 没有任何守卫,它有风险。 当我像下面这样编写 join() 函数以防止在计算时进行不必要的更改 self._value < self._initial_value 当主线程加入信号量时我将面临死锁,此时其他线程不获取 release() 锁定,因为主线程已经累积了它,但主线程仍在等待其他线程。所以这个实现是不正确的。

    def join(self, timeout=None):
        with self._cond:
            if self._value < self._initial_value:
                self._empty.wait(timeout)

在第三个实现中,我不能保证当我想 wait() 事件 _empty 时,其他线程不可能提交它们的结果并释放锁。

    def join(self, timeout=None):
        with self._cond:
            if self._value == self._initial_value:
                return
        self._empty.wait(timeout)

问题是:

我如何使用锁正确计算 self._value < self._initial_value 并等待 _empty 事件并在等待它之前释放锁以避免死锁?

非常感谢

这是我找到的解决这个问题的方法,我想继承基本信号量 class 但找不到解决问题的简单解决方案,所以我决定编辑基本 BoundSemaphore class:

from threading import Event, Condition, Lock
from time import monotonic as _time


class JoinSemaphore:

    def __init__(self, value=1):
        if value < 0:
            raise ValueError("semaphore initial value must be >= 0")
        self._cond = Condition(Lock())
        self._value = value
        self._initial_value = value
        self._empty = Event()
        self._empty.set()

    def acquire(self, blocking=True, timeout=None):
        if not blocking and timeout is not None:
            raise ValueError("can't specify timeout for non-blocking acquire")
        rc = False
        endtime = None
        with self._cond:
            while self._value == 0:
                if not blocking:
                    break
                if timeout is not None:
                    if endtime is None:
                        endtime = _time() + timeout
                    else:
                        timeout = endtime - _time()
                        if timeout <= 0:
                            break
                self._cond.wait(timeout)
            else:
                self._empty.clear()
                self._value -= 1
                rc = True
        return rc

    __enter__ = acquire

    def join(self, timeout=None):
        self._empty.wait(timeout)

    def release(self):
        with self._cond:
            if self._value >= self._initial_value:
                raise ValueError("Semaphore released too many times")
            elif self._value == self._initial_value - 1:
                self._empty.set()

            self._value += 1
            self._cond.notify()

    def acquired(self):
        with self._cond:
            return self._initial_value - self._value

差异是:

class JoinSemaphore:

    def __init__(self, value=1):
        ...
        self._empty = Event()
        self._empty.set()

    def acquire(self, blocking=True, timeout=None):
        ...
        with self._cond:
            while ...:
        ...
            else:
                self._empty.clear()
        ...

    def join(self, timeout=None):
        self._empty.wait(timeout)

    def release(self):
        ...
            elif self._value == self._initial_value - 1:
                self._empty.set()
        ...

    def acquired(self):
        with self._cond:
            return self._initial_value - self._value