在 Python 中使用多处理计算坏点
Counting the dead pixel by using multiprocessing in Python
我不知道多处理如何计算坏点我到目前为止没有多处理就得到了它,分析我们需要分析的 10 张图片大约需要 7 分钟...
import random
import time
from multiprocessing import Process, Queue, current_process, freeze_support
from PIL import Image, ImageDraw
image1 = Image.open('MA_HA1_drawing_0.png')
image2 = Image.open('MA_HA1_drawing_1.png')
image2 = Image.open('MA_HA1_drawing_2.png')
image3 = Image.open('MA_HA1_drawing_3.png')
image4 = Image.open('MA_HA1_drawing_4.png')
image5 = Image.open('MA_HA1_drawing_5.png')
image6 = Image.open('MA_HA1_drawing_6.png')
image7 = Image.open('MA_HA1_drawing_7.png')
image8 = Image.open('MA_HA1_drawing_8.png')
image9 = Image.open('MA_HA1_drawing_9.png')
def analyze_picture(image):
time.sleep(0.5*random.random())
counter = 0
for x in range(616,6446):
for y in range(756,3712):
r,g,b = image.getpixel((x,y))
if r != 1 and g != 1 and b != 1:
counter += 1
return counter
def test():
NUMBER_OF_PROCESSES = 4
TASKS1 = [(analyze_picture(image1))]
TASKS2 = [(analyze_picture(image2))]
TASKS3 = [(analyze_picture(image2))]
TASKS4 = [(analyze_picture(image3))]
TASKS5 = [(analyze_picture(image4))]
TASKS6 = [(analyze_picture(image5))]
TASKS7 = [(analyze_picture(image6))]
TASKS8 = [(analyze_picture(image7))]
TASKS9 = [(analyze_picture(image8))]
TASKS10 = [(analyze_picture(image9))]
print TASKS1
if __name__ == '__main__':
freeze_support()
test()
他们给了我们一些函数来理解多处理并将其用于我们的任务,但我不理解它们,也不知道如何使用它们。
def worker(input, output):
for func, args in iter(input.get, 'STOP'):
result = calculate(func, args)
output.put(result)
def calculate(func, args):
result = func(*args)
return '%s says that %s%s = %s' % \
(current_process().name, func.__name__, args, result)
def mul(a, b):
time.sleep(0.5*random.random())
return a * b
def plus(a, b):
time.sleep(0.5*random.random())
return a + b
# Create queues
task_queue = Queue()
done_queue = Queue()
# Submit tasks
for task in TASKS1:
task_queue.put(task)
# Start worker processes
for i in range(NUMBER_OF_PROCESSES):
Process(target=worker, args=(task_queue, done_queue)).start()
print i
# Get and print results
print 'Unordered results:'
for i in range(len(TASKS1)):
print '\t', done_queue.get()
# Add more tasks using `put()`
for task in TASKS2:
task_queue.put(task)
# Get and print some more results
for i in range(len(TASKS2)):
print '\t', done_queue.get()
# Tell child processes to stop
for i in range(NUMBER_OF_PROCESSES):
task_queue.put('STOP')
print 'process ', i, ' is stopped'
编辑:新代码
import random
import time
from multiprocessing import Process, Queue, current_process, freeze_support
from PIL import Image, ImageDraw
def worker(input, output):
for func, args in iter(input.get, 'STOP'):
result = calculate(func, args)
output.put(result)
def calculate(func, args):
result = func(args)
return '%s says that %s%s has %s dead pixels\n' % \
(current_process().name, func.__name__, args, result)
def analyze_picture(image_name):
t1 = time.clock()
image = Image.open(image_name)
time.sleep(0.5*random.random())
counter = 0
for x in range(616,6446):
for y in range(756,3712):
r,g,b = image.getpixel((x,y))
if r != 1 and g != 1 and b != 1:
counter += 1
t2 = time.clock()
dt = t2 - t1
print '\tThe process takes ',dt,' seconds.\n Result:\n'
return counter
def test():
NUMBER_OF_PROCESSES = 4
TASKS1 = [(analyze_picture, image_names[i]) for i in range(10)]
print TASKS1
# Create queues
task_queue = Queue()
done_queue = Queue()
# Submit tasks
for task in TASKS1:
task_queue.put(task)
# Start worker processes
for i in range(NUMBER_OF_PROCESSES):
Process(target=worker, args=(task_queue, done_queue)).start()
print i
# Get and print results
print 'Unordered results:'
for i in range(len(TASKS1)):
print '\t', done_queue.get()
# Tell child processes to stop
for i in range(NUMBER_OF_PROCESSES):
task_queue.put('STOP')
print 'process ', i, ' is stopped'
if __name__ == '__main__':
image_names =[('MA_HA1_drawing_'+str(i)+'.png') for i in range(10)]
freeze_support()
test()
多处理背后的思想:
- 创建多个可以分布到不同内核并行执行的worker。
- 对于多进程ing,这些工作进程是具有独立内存的进程space(与线程相反)。
- 由于单独的内存space,它们无法通过内存进行通信(接收任务并发送其结果)。因此,需要进程间通信的队列。
- 现在,任务通过队列分配给工作人员。
- 最后,收集工作人员通过队列发送的结果。
如果必须使用发布的代码,您可以按如下方式进行:
- 创建队列
- 启动工作进程
- 提交任务
在创建工人后提交任务非常重要。队列的缓冲区可能会变满并阻塞,直到从中取出某些东西,但只要还没有工作人员,就不会从队列中取出任何东西 --> 死锁。
由于您想并行处理所有图像,因此您的 TASKS1
(识别复数)必须是 [(analyze_picture, (analyze_picture(image1),), (analyze_picture, (analyze_picture(image2),), ...]
(worker
需要函数的元组和参数作为元组本身) .
- 获取并打印结果
- 告诉子进程停止
可能这就是你问的问题。
毕竟,提高性能(和代码可读性)还有三个方面:
- 进程间通信非常昂贵。因此,您应该尽量减少从工作人员传输和向工作人员传输的数据。
在您的情况下,这意味着仅传递图像名称而不是整个图像。此外,这会导致并行读入所有图像,因为工作人员会读取图像。
- 所有工作人员的东西都已在
multiprocessing.Pool
中实现,这将多处理的代码行减少到两行:
pool = multiprocessing.Pool(processes=NUMBER_OF_PROCESSES)
result = pool.map(analyze_picture, [image1, image2, ...])
- 最后但并非最不重要的一点是,逐像素迭代相当慢。使用 NumPy(或更高级别 SciPy),您可以大大加快它的速度。
最后,您的脚本可能如下所示,并且会比 7 分钟快得多:
import multiprocessing as mp
import numpy as np
from scipy import misc
def analyze_picture(imagename):
image = misc.imread(imagename) # image[y, x, r/g/b]
return len(np.argwhere( (a[756:,616:,0]!=1) & (a[756:,616:,1]!=1) & (a[756:,616:,2]!=1) ))
def main():
pool = mp.Pool() # default: number of logical cores
result = pool.map(analyze_picture, ( "MA_HA1_drawing_{}.png".format(i)
for i in range(10) ))
print(result)
if __name__ == '__main__':
mp.freeze_support()
main()
我不确定你的图像是什么样的({r,g,b}!=1
很奇怪)但是在 reference of scipy.misc.imread
你会找到适合你图像的模式。
我不知道多处理如何计算坏点我到目前为止没有多处理就得到了它,分析我们需要分析的 10 张图片大约需要 7 分钟...
import random
import time
from multiprocessing import Process, Queue, current_process, freeze_support
from PIL import Image, ImageDraw
image1 = Image.open('MA_HA1_drawing_0.png')
image2 = Image.open('MA_HA1_drawing_1.png')
image2 = Image.open('MA_HA1_drawing_2.png')
image3 = Image.open('MA_HA1_drawing_3.png')
image4 = Image.open('MA_HA1_drawing_4.png')
image5 = Image.open('MA_HA1_drawing_5.png')
image6 = Image.open('MA_HA1_drawing_6.png')
image7 = Image.open('MA_HA1_drawing_7.png')
image8 = Image.open('MA_HA1_drawing_8.png')
image9 = Image.open('MA_HA1_drawing_9.png')
def analyze_picture(image):
time.sleep(0.5*random.random())
counter = 0
for x in range(616,6446):
for y in range(756,3712):
r,g,b = image.getpixel((x,y))
if r != 1 and g != 1 and b != 1:
counter += 1
return counter
def test():
NUMBER_OF_PROCESSES = 4
TASKS1 = [(analyze_picture(image1))]
TASKS2 = [(analyze_picture(image2))]
TASKS3 = [(analyze_picture(image2))]
TASKS4 = [(analyze_picture(image3))]
TASKS5 = [(analyze_picture(image4))]
TASKS6 = [(analyze_picture(image5))]
TASKS7 = [(analyze_picture(image6))]
TASKS8 = [(analyze_picture(image7))]
TASKS9 = [(analyze_picture(image8))]
TASKS10 = [(analyze_picture(image9))]
print TASKS1
if __name__ == '__main__':
freeze_support()
test()
他们给了我们一些函数来理解多处理并将其用于我们的任务,但我不理解它们,也不知道如何使用它们。
def worker(input, output):
for func, args in iter(input.get, 'STOP'):
result = calculate(func, args)
output.put(result)
def calculate(func, args):
result = func(*args)
return '%s says that %s%s = %s' % \
(current_process().name, func.__name__, args, result)
def mul(a, b):
time.sleep(0.5*random.random())
return a * b
def plus(a, b):
time.sleep(0.5*random.random())
return a + b
# Create queues
task_queue = Queue()
done_queue = Queue()
# Submit tasks
for task in TASKS1:
task_queue.put(task)
# Start worker processes
for i in range(NUMBER_OF_PROCESSES):
Process(target=worker, args=(task_queue, done_queue)).start()
print i
# Get and print results
print 'Unordered results:'
for i in range(len(TASKS1)):
print '\t', done_queue.get()
# Add more tasks using `put()`
for task in TASKS2:
task_queue.put(task)
# Get and print some more results
for i in range(len(TASKS2)):
print '\t', done_queue.get()
# Tell child processes to stop
for i in range(NUMBER_OF_PROCESSES):
task_queue.put('STOP')
print 'process ', i, ' is stopped'
编辑:新代码
import random
import time
from multiprocessing import Process, Queue, current_process, freeze_support
from PIL import Image, ImageDraw
def worker(input, output):
for func, args in iter(input.get, 'STOP'):
result = calculate(func, args)
output.put(result)
def calculate(func, args):
result = func(args)
return '%s says that %s%s has %s dead pixels\n' % \
(current_process().name, func.__name__, args, result)
def analyze_picture(image_name):
t1 = time.clock()
image = Image.open(image_name)
time.sleep(0.5*random.random())
counter = 0
for x in range(616,6446):
for y in range(756,3712):
r,g,b = image.getpixel((x,y))
if r != 1 and g != 1 and b != 1:
counter += 1
t2 = time.clock()
dt = t2 - t1
print '\tThe process takes ',dt,' seconds.\n Result:\n'
return counter
def test():
NUMBER_OF_PROCESSES = 4
TASKS1 = [(analyze_picture, image_names[i]) for i in range(10)]
print TASKS1
# Create queues
task_queue = Queue()
done_queue = Queue()
# Submit tasks
for task in TASKS1:
task_queue.put(task)
# Start worker processes
for i in range(NUMBER_OF_PROCESSES):
Process(target=worker, args=(task_queue, done_queue)).start()
print i
# Get and print results
print 'Unordered results:'
for i in range(len(TASKS1)):
print '\t', done_queue.get()
# Tell child processes to stop
for i in range(NUMBER_OF_PROCESSES):
task_queue.put('STOP')
print 'process ', i, ' is stopped'
if __name__ == '__main__':
image_names =[('MA_HA1_drawing_'+str(i)+'.png') for i in range(10)]
freeze_support()
test()
多处理背后的思想:
- 创建多个可以分布到不同内核并行执行的worker。
- 对于多进程ing,这些工作进程是具有独立内存的进程space(与线程相反)。
- 由于单独的内存space,它们无法通过内存进行通信(接收任务并发送其结果)。因此,需要进程间通信的队列。
- 现在,任务通过队列分配给工作人员。
- 最后,收集工作人员通过队列发送的结果。
如果必须使用发布的代码,您可以按如下方式进行:
- 创建队列
- 启动工作进程
- 提交任务
在创建工人后提交任务非常重要。队列的缓冲区可能会变满并阻塞,直到从中取出某些东西,但只要还没有工作人员,就不会从队列中取出任何东西 --> 死锁。
由于您想并行处理所有图像,因此您的TASKS1
(识别复数)必须是[(analyze_picture, (analyze_picture(image1),), (analyze_picture, (analyze_picture(image2),), ...]
(worker
需要函数的元组和参数作为元组本身) . - 获取并打印结果
- 告诉子进程停止
可能这就是你问的问题。
毕竟,提高性能(和代码可读性)还有三个方面:
- 进程间通信非常昂贵。因此,您应该尽量减少从工作人员传输和向工作人员传输的数据。
在您的情况下,这意味着仅传递图像名称而不是整个图像。此外,这会导致并行读入所有图像,因为工作人员会读取图像。 - 所有工作人员的东西都已在
multiprocessing.Pool
中实现,这将多处理的代码行减少到两行:
pool = multiprocessing.Pool(processes=NUMBER_OF_PROCESSES) result = pool.map(analyze_picture, [image1, image2, ...])
- 最后但并非最不重要的一点是,逐像素迭代相当慢。使用 NumPy(或更高级别 SciPy),您可以大大加快它的速度。
最后,您的脚本可能如下所示,并且会比 7 分钟快得多:
import multiprocessing as mp
import numpy as np
from scipy import misc
def analyze_picture(imagename):
image = misc.imread(imagename) # image[y, x, r/g/b]
return len(np.argwhere( (a[756:,616:,0]!=1) & (a[756:,616:,1]!=1) & (a[756:,616:,2]!=1) ))
def main():
pool = mp.Pool() # default: number of logical cores
result = pool.map(analyze_picture, ( "MA_HA1_drawing_{}.png".format(i)
for i in range(10) ))
print(result)
if __name__ == '__main__':
mp.freeze_support()
main()
我不确定你的图像是什么样的({r,g,b}!=1
很奇怪)但是在 reference of scipy.misc.imread
你会找到适合你图像的模式。