python3 - 仅在给定进程内命名信号量?

python3 - Named semaphores only within a given process?

我知道有 python 模块允许使用 IPC 和 System V 命名信号量。但是,这些资源存在于系统级别。对于我的特定多线程 python3 应用程序,我需要命名信号量以保护某些完全不相关的代码部分,但这些信号量应该只特定于进程,而不是系统范围的。

我一直无法找到任何 python 代码来实现仅存在于当前多线程进程中的命名信号量。有谁知道已经写过的类似代码吗?

我特别想要信号量,而不仅仅是一个简单的互斥体,因为我想允许对进程中的这些关键代码部分进行一定数量的并发访问,达到一些可配置的最大访问数量。

有什么建议吗?

提前致谢。

为什么不使用 threading.Semaphore

如果另一个 进程.

访问信号量,则只需要 名称

根据定义,线程共享一个地址 space,因此它们基本上可以访问正确范围内的所有内容。

编辑: 请注意 Semaphore 是 class。因此,每次调用 threading.Semaphore() 都会创建 Semaphore class.

单独实例

根据我在这里与 Ronald Smith 的讨论,我现在明白我做了一个错误的假设。我错误地认为每个 python 解释器只有一个共享的底层信号量实例。现在我发现情况并非如此,我明白每个进程可以使用多个信号量。

但是,我仍然想要命名信号量,以处理我的代码的多个部分可能共享一个信号量,而其他部分可能想要使用其他信号量的情况。所以我在 threading.BoundedSemaphore 周围实现了一个命名信号量包装器来处理这种情况。

我删除了我之前的代码(通过命名队列近似命名信号量),并将其替换为以下实现命名的、特定于进程的信号量的代码...

def _out(msg=None):
    if msg:
        sys.stdout.write(msg)
    sys.stdout.write('\n')
    # Just in case ...
    sys.stdout.flush()

def _err(msg=None):
    if msg:
            sys.stderr.write(msg)
    sys.stderr.write('\n')
    # Just in case ...
    sys.stderr.flush()

# This factory is used by the semaphore-based locker, below.
class MyNamedSem(object):
    __uniqueid = uuid.uuid4()

    @classmethod
    def get(cls, name, value=1):
        if not name:
            raise ValueError('missing semaphore name')
        if not value or value < 1:
            raise ValueError('value must be a positive integer')
        with threading.RLock():
            sem = cls._cache.get(name, None)
            if not sem:
                sem = MyNamedSem(name, value=value, _uid=cls.__uniqueid)
            cls._cache[name] = sem
            return sem

    def __init__(self, name, value=1, _uid=None):
        # This makes it impossible, for all intents and purposes,
        # to get an instance of this class without going through
        # the factory method.
        assert(_uid == self.__class__.__uniqueid), "use the 'get' factory method"

        self._name = name
        self._s    = threading.BoundedSemaphore(value=value)
MyNamedSem._cache = {}

class MySemLock(object):

    def __init__(self, name, maxaccesses=1, timeout=None, verbose=True):
        if name:
            self._name = name
        else:
            raise ValueError('!!! missing semaphore name')
        if not maxaccesses or maxaccesses < 1:
            raise ValueError('!!! max accesses must be a positive integer')
        self._timeout = timeout
        self._sem     = MyNamedSem.get(self._name, value=maxaccesses)._s
        self._verbose = verbose

    def acquire(self):
        try:
            rc = self._sem.acquire(blocking=True, timeout=self._timeout)
            if rc:
                return True
            elif self._timeout:
                if self._verbose:
                    _err('!!! {} timeout'.format(self))
                return False
            else:
                if self._verbose:
                    _err('!!! {} failed'.format(self))
                return False
        except Exception as e:
            if self._verbose:
                _err('!!! {} acquire exception: {}'.format(self, e))
            return False
        return True

    def release(self):
        try:
            # The only way this release call should fail is if
            # release has been called too many times.
            self._sem.release()
            if self._verbose:
                _out('=== {} released'.format(self))
            return True
        except Exception as e:
            if self._verbose:
                _err('!!! {} release exception: {}'.format(self, e))
            return False

    # Dijkstra's traditional names ...
    P = acquire
    V = release

    def __enter__(self):
        if self.acquire():
            return self
        else:
            return None

    def __exit__(self, tp, val, tb):
        self.release()
        if tp:
            _err('!!! {} exception: {}\n{}\n{}'.format(self, tp, val, tb))
        return False

    def __repr__(self):
        return '<semlock: {}>'.format(self._name)

我会这样使用它:

with MySemLock('section0', maxaccess=10):
    # Critical section which allows 10 concurrent accesses.