我想在 python 中锁定异步方法的一部分,基本上它应该是一个(基于名称的锁),其中名称作为参数传递给方法

I want to lock a part of a async method in python basically it should be a(name based lock) where name is passed as argument to the method

在下面的代码中,test() 函数等待来自 UI 的请求,因为请求是以 JSON 形式接收的,它通过调用 handle() 函数为每个请求创建一个任务

async def test():
     loop = asyncio.get_running_loop()
     req = await receiver.recv_string()
     logger.debug(f"Request received {req}")
     req_json = json.loads(req)
     logger.debug("Await create_task")
     loop.create_task(handle(req_json))

async def handle(req_json_):
     req_name = req_json_.get(req_name)
    # acquire lock here based on req_name if request comes with different name acquire the lock 
    # but if the request comes with same name block the request  
    # untill the req for that name is completed if the request is already completed then acquire 
    # the lock  with that name 
     logger.info(f"Request finished with req name {req_name} for action patch stack")

如何使用 asyncio 模块或 python

中的任何其他方式实现此目的

在我看来,您需要做的就是维护一个锁的字典,其键是变量 req_name 给出的名称,其值是相应的锁。如果键 reg_name 不在字典中,那么将为该键添加一个新锁:

import asyncio
from collections import defaultdict


# dictionary of locks
locks = defaultdict(asyncio.Lock)

async def handle(req_json_):
    req_name = req_json_.get(req_name)
    # acquire lock here based on req_name if request comes with different name acquire the lock 
    # but if the request comes with same name block the request  
    # untill the req for that name is completed if the request is already completed then acquire 
    # the lock  with that name
    # Get lock from locks dictionary with name req_name. If it
    # does not exit, then create a new lock and store it with key
    # req_name and return it:
    lock = locks[req_name]
    async with lock:
        # do whatever needs to be done:
        ...
    logger.info(f"Request finished with req name {req_name} for action patch stack")

更新

如果您需要使获取锁的尝试超时,则创建一个协同程序,该协程通过适当的 timeout[=24 调用 asyncio.wait_for 来获取传递的锁参数=] 参数:

import asyncio
from collections import defaultdict


async def acquire_lock(lock):
    await lock.acquire()

# dictionary of locks
locks = defaultdict(asyncio.Lock)

async def handle(req_json_):
    req_name = req_json_.get(req_name)
    lock = locks[req_name]
    # Try to acquire the lock but timeout after 1 second:
    try:
        await asyncio.wait_for(acquire_lock(lock), timeout=1.0)
    except asyncio.TimeoutError:
        # Here if the lockout could not be acquired:
        ...
    else:
        # Do whatever needs to be done
        # The lock must now be explicitly released:
        try:
            ...
            logger.info(f"Request finished with req name {req_name} for action patch stack")
        finally:
            lock.release()