在 Python 中使用多处理计算坏点

Counting the dead pixel by using multiprocessing in Python

我不知道多处理如何计算坏点我到目前为止没有多处理就得到了它,分析我们需要分析的 10 张图片大约需要 7 分钟...

import random
import time

from multiprocessing import Process, Queue, current_process, freeze_support
from PIL import Image, ImageDraw

image1 = Image.open('MA_HA1_drawing_0.png')
image2 = Image.open('MA_HA1_drawing_1.png')
image2 = Image.open('MA_HA1_drawing_2.png')
image3 = Image.open('MA_HA1_drawing_3.png')
image4 = Image.open('MA_HA1_drawing_4.png')
image5 = Image.open('MA_HA1_drawing_5.png')
image6 = Image.open('MA_HA1_drawing_6.png')
image7 = Image.open('MA_HA1_drawing_7.png')
image8 = Image.open('MA_HA1_drawing_8.png')
image9 = Image.open('MA_HA1_drawing_9.png')

def analyze_picture(image):
    time.sleep(0.5*random.random())
    counter = 0
    for x in range(616,6446):
        for y in range(756,3712):
            r,g,b = image.getpixel((x,y))

            if r != 1 and g != 1 and b != 1:
                counter += 1
    return counter

def test():
    NUMBER_OF_PROCESSES = 4
    TASKS1 = [(analyze_picture(image1))]
    TASKS2 = [(analyze_picture(image2))]
    TASKS3 = [(analyze_picture(image2))]
    TASKS4 = [(analyze_picture(image3))]
    TASKS5 = [(analyze_picture(image4))]
    TASKS6 = [(analyze_picture(image5))]
    TASKS7 = [(analyze_picture(image6))]
    TASKS8 = [(analyze_picture(image7))]
    TASKS9 = [(analyze_picture(image8))]
    TASKS10 = [(analyze_picture(image9))]

    print TASKS1

if __name__ == '__main__':
    freeze_support()
    test()

他们给了我们一些函数来理解多处理并将其用于我们的任务,但我不理解它们,也不知道如何使用它们。

def worker(input, output):
    for func, args in iter(input.get, 'STOP'):
        result = calculate(func, args)
        output.put(result)

def calculate(func, args):
    result = func(*args)
    return '%s says that %s%s = %s' % \
        (current_process().name, func.__name__, args, result)

def mul(a, b):
    time.sleep(0.5*random.random())
    return a * b

def plus(a, b):
    time.sleep(0.5*random.random())
    return a + b

 # Create queues
    task_queue = Queue()
    done_queue = Queue()

    # Submit tasks
    for task in TASKS1:
        task_queue.put(task)

    # Start worker processes
    for i in range(NUMBER_OF_PROCESSES):
        Process(target=worker, args=(task_queue, done_queue)).start()
        print i

    # Get and print results
    print 'Unordered results:'
    for i in range(len(TASKS1)):
        print '\t', done_queue.get()

    # Add more tasks using `put()`
    for task in TASKS2:
        task_queue.put(task)

    # Get and print some more results
    for i in range(len(TASKS2)):
        print '\t', done_queue.get()

    # Tell child processes to stop
    for i in range(NUMBER_OF_PROCESSES):
        task_queue.put('STOP')
        print 'process ', i, ' is stopped'

编辑:新代码

import random
import time

from multiprocessing import Process, Queue, current_process, freeze_support
from PIL import Image, ImageDraw


def worker(input, output):
    for func, args in iter(input.get, 'STOP'):
        result = calculate(func, args)
        output.put(result)

def calculate(func, args):
    result = func(args)
    return '%s says that %s%s has %s dead pixels\n' % \
        (current_process().name, func.__name__, args, result)

def analyze_picture(image_name):
    t1 = time.clock()
    image = Image.open(image_name)
    time.sleep(0.5*random.random())
    counter = 0
    for x in range(616,6446):
        for y in range(756,3712):
            r,g,b = image.getpixel((x,y))

            if r != 1 and g != 1 and b != 1:
                counter += 1

    t2 = time.clock()
    dt = t2 - t1
    print '\tThe process takes ',dt,' seconds.\n Result:\n'
    return counter

def test():


    NUMBER_OF_PROCESSES = 4

    TASKS1 = [(analyze_picture, image_names[i]) for i in range(10)]

    print TASKS1

    # Create queues
    task_queue = Queue()
    done_queue = Queue()

    # Submit tasks
    for task in TASKS1:
        task_queue.put(task)

    # Start worker processes
    for i in range(NUMBER_OF_PROCESSES):
        Process(target=worker, args=(task_queue, done_queue)).start()
        print i

    # Get and print results
    print 'Unordered results:'
    for i in range(len(TASKS1)):
        print '\t', done_queue.get()

    # Tell child processes to stop
    for i in range(NUMBER_OF_PROCESSES):
        task_queue.put('STOP')
        print 'process ', i, ' is stopped'

if __name__ == '__main__':
    image_names =[('MA_HA1_drawing_'+str(i)+'.png') for i in range(10)]
    freeze_support()
    test()

多处理背后的思想:

  • 创建多个可以分布到不同内核并行执行的worker。
  • 对于多进程ing,这些工作进程是具有独立内存的进程space(与线程相反)。
  • 由于单独的内存space,它们无法通过内存进行通信(接收任务并发送其结果)。因此,需要进程间通信的队列。
  • 现在,任务通过队列分配给工作人员。
  • 最后,收集工作人员通过队列发送的结果。

如果必须使用发布的代码,您可以按如下方式进行:

  1. 创建队列
  2. 启动工作进程
  3. 提交任务
    在创建工人后提交任务非常重要。队列的缓冲区可能会变满并阻塞,直到从中取出某些东西,但只要还没有工作人员,就不会从队列中取出任何东西 --> 死锁。
    由于您想并行处理所有图像,因此您的 TASKS1 (识别复数)必须是 [(analyze_picture, (analyze_picture(image1),), (analyze_picture, (analyze_picture(image2),), ...]worker 需要函数的元组和参数作为元组本身) .
  4. 获取并打印结果
  5. 告诉子进程停止

可能这就是你问的问题。

毕竟,提高性能(和代码可读性)还有三个方面:

  • 进程间通信非常昂贵。因此,您应该尽量减少从工作人员传输和向工作人员传输的数据。
    在您的情况下,这意味着仅传递图像名称而不是整个图像。此外,这会导致并行读入所有图像,因为工作人员会读取图像。
  • 所有工作人员的东西都已在 multiprocessing.Pool 中实现,这将多处理的代码行减少到两行:
    pool = multiprocessing.Pool(processes=NUMBER_OF_PROCESSES) result = pool.map(analyze_picture, [image1, image2, ...])
  • 最后但并非最不重要的一点是,逐像素迭代相当慢。使用 NumPy(或更高级别 SciPy),您可以大大加快它的速度。

最后,您的脚本可能如下所示,并且会比 7 分钟快得多:

import multiprocessing as mp
import numpy as np
from scipy import misc

def analyze_picture(imagename):
    image = misc.imread(imagename)    # image[y, x, r/g/b]
    return len(np.argwhere( (a[756:,616:,0]!=1) & (a[756:,616:,1]!=1) & (a[756:,616:,2]!=1) ))

def main():
    pool = mp.Pool()                  # default: number of logical cores
    result = pool.map(analyze_picture, ( "MA_HA1_drawing_{}.png".format(i)
                                            for i in range(10) ))
    print(result)

if __name__ == '__main__':
    mp.freeze_support()
    main()

我不确定你的图像是什么样的({r,g,b}!=1 很奇怪)但是在 reference of scipy.misc.imread 你会找到适合你图像的模式。