在 python3 多处理中按顺序访问共享资源

In-order access to shared resource in python3 Multiprocessing

给定以下 MWE:

import multiprocessing

def thread_function(shared_resource : dict, val : int) -> None: 

    if val not in shared_resource.keys(): 
        print(f"Value {val} is not in keys!")
        shared_resource[val] = 1

    shared_resource[val] += 1

def main():

    no_of_threads = 5
    manager       = multiprocessing.Manager()
    
    shared_resource = manager.dict()

    values = [1 , 1 , 2 , 1 , 3, 3, 3, 4, 5]

    with multiprocessing.Pool(no_of_threads) as pool:

        pool.starmap(thread_function,
                     [ (shared_resource, val) for val in values], 
                     chunksize=1)

    print(shared_resource)


if __name__ == "__main__": main()

我有一个 readers/writers 问题,我不知道如何解决。字典是线程之间的共享资源,我希望对它的访问是原子的,即避免两个线程试图向它写入相同值的情况。例如这里是一个不正确的输出:

Value 1 is not in keys!
Value 1 is not in keys! # <-- Nope! 
Value 2 is not in keys!
Value 3 is not in keys!
Value 4 is not in keys!
Value 5 is not in keys!

另一方面,线程有可能以正确的顺序访问资源并且输出将是正确的,即

Value 1 is not in keys!
Value 2 is not in keys!
Value 3 is not in keys!
Value 4 is not in keys!
Value 5 is not in keys!
{1: 4, 2: 2, 3: 4, 4: 2, 5: 2}

但是我怎样才能避免这种情况并始终让它们按预期运行呢?提前谢谢你。

我不会称之为犯罪,但我会称之为医疗事故。

您在thread_function中的代码构成了一个临界区,其执行需要序列化,这样只有一个进程可以一次执行它。即使看似单个语句 shared_resource[val] += 1,也包含多个字节码指令,并且两个进程可以读取 shared_resource[val] 的相同初始值并存储相同的更新值。但是显然并行处理多个 运行 可以清楚地发现字典中没有键并且将存储相同的键。

import multiprocessing

def init_processes(the_lock):
    global lock
    lock = the_lock

def thread_function(shared_resource : dict, val : int) -> None:

    with lock:
        if val not in shared_resource.keys():
            print(f"Value {val} is not in keys!")
            shared_resource[val] = 1

        shared_resource[val] += 1

def main():

    no_of_threads = 5
    manager       = multiprocessing.Manager()

    shared_resource = manager.dict()

    values = [1 , 1 , 2 , 1 , 3, 3, 3, 4, 5]

    lock = multiprocessing.Lock()
    with multiprocessing.Pool(no_of_threads, initializer=init_processes, initargs=(lock,)) as pool:

        pool.starmap(thread_function,
                     [ (shared_resource, val) for val in values],
                     chunksize=1)

    print(shared_resource)


if __name__ == "__main__": main()

打印:

Value 1 is not in keys!
Value 2 is not in keys!
Value 3 is not in keys!
Value 5 is not in keys!
Value 4 is not in keys!
{1: 4, 2: 2, 3: 4, 5: 2, 4: 2}

注意但是,您无法控制任务的分派,因此无法控制密钥的存储顺序。