Python中是否有参数锁定机制?

Is there a parametric locking mechanism in Python?

parametric 我的意思是可以与参数一起使用的锁,并且只锁定使用相同参数的线程。该参数可以有数千个不同的值,并且不可能创建一个包含 threading.Lock 个对象的字典,每个参数值一个。

我的 Web 服务器上的一个函数接受两个参数,一个组和其他参数。每次调用该函数时,它都会检查组文件是否已更改(非常快),如果已更改,则对其进行处理(非常慢)。每个组的慢处理完全独立于其他组并且可以同时发生,但是每个组的两个元素不能同时处理,也不能在处理组的同时处理。

我能够使用正在处理的组的全局列表让它工作,但现在我已经完成了它,我认为它真的很难看,必须有更好的方法。

下面的代码片段显示了我正在寻找的内容。它使用虚数LockWithGroup。 Python中有类似的东西吗?

process_lock = threading.LockWithGroup()
def process_element(group, element):
    print('Start', group)

    with process_lock(group):
        if needs_update(group):
            print('Updating', group)
            update_group(group)
            print('Updated', group)

    with process_lock(group):
        retval = do_something_with(group, element)

    print('End', group)

    return retval

process_element('g1', e1)   # a
process_element('g1', e2)   #  b
process_element('g1', e3)   #   c
process_element('g2', e4)   #    d
process_element('g2', e5)   #     e

输出:

> Start g1                  # a
> Start g2                  #    d
> Updating g1               # a
> Updating g2               #    d
> Updated g1                # a
> End g1                    # a
> Start g1                  #  b
> End g1                    #  b
> Start g1                  #   c
> Updated g2                #    d
> End g2                    #    d
> Start g2                  #     e    
> End g1                    #   c
> End g2                    #     e

受评论中提到的 答案的启发,我创建了一个似乎可以完成这项工作的 class。

我使用了那个答案中的代码,添加了 timeoutblocking 参数并将其放在 class 中,这样我就可以将它用作上下文管理器。 class使用静态方法,因此可以实例化一次,也可以创建多次(如下面的测试slow_worker_2)。

代码的第一部分是 class,第二部分测试显式 acquirerelease 以及使用 with 的上下文管理器。

import threading
import time

namespace_lock = threading.Lock()
namespace = {}
counters = {}


class NamespaceLock:
    def __init__(self, group):
        self.group = group

    def __enter__(self):
        self.__class__.acquire_lock(self.group)

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.__class__.release_lock(self.group)

    @staticmethod
    def acquire_lock(value, blocking=True, timeout=-1.0):
        with namespace_lock:
            if value in namespace:
                counters[value] += 1
            else:
                namespace[value] = threading.Lock()
                counters[value] = 1

        return namespace[value].acquire(blocking=blocking, timeout=timeout)

    @staticmethod
    def release_lock(value):
        with namespace_lock:
            if counters[value] == 1:
                del counters[value]
                lock = namespace.pop(value)
            else:
                counters[value] -= 1
                lock = namespace[value]

        lock.release()


def slow_worker_1(group, seconds):
    if NamespaceLock.acquire_lock(group, timeout=2.5):
        print('Start   {} {}'.format(group, seconds))
        time.sleep(seconds)
        print('End     {} {}'.format(group, seconds))
        NamespaceLock.release_lock(group)
    else:
        print('Timeout {} {}'.format(group, seconds))


def slow_worker_2(group, seconds):
    with NamespaceLock(group):
        print('Start {} {}'.format(group, seconds))
        time.sleep(seconds)
        print('End   {} {}'.format(group, seconds))


def join_all(name):
    for t in threading.enumerate():
        if t.name == name:
            t.join()


if __name__ == '__main__':
    print('explicit acquire and release')

    threading.Thread(target=slow_worker_1, args=('g1', 1), name='worker').start()
    threading.Thread(target=slow_worker_1, args=('g2', 2), name='worker').start()
    threading.Thread(target=slow_worker_1, args=('g1', 3), name='worker').start()
    threading.Thread(target=slow_worker_1, args=('g2', 4), name='worker').start()
    threading.Thread(target=slow_worker_1, args=('g1', 5), name='worker').start()

    join_all('worker')

    print('context manager')

    threading.Thread(target=slow_worker_2, args=('g1', 1), name='worker').start()
    threading.Thread(target=slow_worker_2, args=('g2', 2), name='worker').start()
    threading.Thread(target=slow_worker_2, args=('g1', 3), name='worker').start()
    threading.Thread(target=slow_worker_2, args=('g2', 4), name='worker').start()
    threading.Thread(target=slow_worker_2, args=('g1', 5), name='worker').start()

    join_all('worker')