使用多处理 Lock() 来防止生成损坏的图

Using multiprocessing Lock() to prevent corrupt plots from being generated

将 matplotlib 后端设置为 'PDF',我已将绘图合并到一个函数中。该函数的中心主题是生成图,如下所示:

def func(arg1,arg2,arg3,num):
    try:
        fig = pdf.savefig(num+arg1+arg2,bbox_inches = "tight")
        return fig
    except:
        return None

我已经能够通过以下实现以串行方式成功获得所需的 results(地块):

data = list(range(100))
results=[func(arg1,arg2,arg3,num) for num in data]

我尝试使用 pool.map() 和 pool.apply_async() 方法以并行化的方式实现它,如下所示:

pool.map() 实现:

if __name__ == "__main__":
    try:
        pool = Pool()    
        data = list(range(50))
        funct = partial(func,arg1,arg2,arg3)
        results = pool.map(funct, data)
    finally:
        pool.close()
        pool.join()

pool.async() 实现:

if __name__ == "__main__":
    try:
        pool = Pool()    
        results = []
        data = list(range(50))
        result_objects = [pool.apply_async(func, args=(arg1,arg2,arg3,num)) for num in data]
        results = [r.get() for r in result_objects]
    finally:
        pool.close()
        pool.join()

对于这两种并行实现,我注意到在 50 次模拟中,只有 6 次生成了可读的 pdf 文件,而其他的则损坏了。当我将模拟次数更改为 10 次时,只有 3 次生成可读的 pdf 文件,而其他的则已损坏。

我似乎不明白为什么只有少数情节制作得当,而其他情节却被破坏了。

我正在使用 4 核 Linux Ubuntu 18.04 机器执行多处理。

我遇到了 multiprocessing queue() 模块,它似乎负责主进程和子进程之间的通信。我猜想当前的通信方式存在一些错误,因此导致大多数迭代的图像损坏。

希望帮助将多处理 queue() 合并到代码中以解决此问题。

我假设 Queue() 可能有一些神奇的能力来消除代码中的皱纹。

经历了几次 posts,我意识到 Lock() 有可能解开我的痛苦。

以下修改后的代码工作正常:

pool.map() 实现:

def func(arg1,arg2,arg3,lock,num):
    with lock:
        #your code#

from multiprocessing import Pool, Lock, Manager
from functools import partial
import time

if __name__ == "__main__":
    try: 
        pool = Pool()
        m = Manager()
        lock = m.Lock()
        data = list(range(1000))
        funct = partial(func,arg1,arg2,arg3,lock)
        results = pool.map(funct, data)
    except:
        pool.close()
        pool.join()

pool.async() 实现:

def func(num,arg1,arg2,arg3,lock):
    with lock:
        # your code #

from multiprocessing import Pool, Lock, Manager
from functools import partial
import time

if __name__ == "__main__":
    try:
        pool = Pool()
        m = Manager()
        lock = m.Lock()
        results = []
        data = list(range(1000))
        result_objects = [pool.apply_async(func, args=(num,arg1,arg2,arg3,lock)) for num in data]
        results = [r.get() for r in result_objects]
    finally:
        pool.close()
        pool.join()