在 python 中有效地处理阻塞操作
Handle blocking operations efficiently in python
我正在使用 python 和 OpenCV 从 rtsp 流中获取视频。我从流中获取单帧并将它们保存到文件系统。
我写了一个 StreamingWorker
来处理帧获取和保存。此外还有一个 StreamPool
包含所有流对象。我认为 StreamingWorker
总是 运行,每个核心应该只有一个,以便尽可能多地使用。然后 StreamPool
将 VideoCapture
对象提供给可用的 StreamingWorker
。
问题是脚本 运行 的大部分时间是阻塞的:
import os
import time
import threading
import cv2 as cv
class StreamingWorker(object):
def __init__(self, stream_pool):
self.stream_pool = stream_pool
self.start_loop()
def start_loop(self):
while True:
try:
# getting a stream from the read_strategy
stream_object = self.stream_pool.next()
# getting an image from the stream
_, frame = stream_object['stream'].read()
# saving image to file system
cv.imwrite(os.path.join('result', stream_object['feed'], '{}.jpg'.format(time.time())))
except ValueError as e:
print('[error] {}'.format(e))
class StreamPool(object):
def __init__(self, streams):
self.streams = [{'feed': stream, 'stream': cv.VideoCapture(stream)} for stream in streams]
self.current_stream = 0
self.lock = threading.RLock()
def next(self):
self.lock.acquire()
if(self.current_stream + 1 >= len(self.streams)):
self.current_stream = 0
else:
self.current_stream += 1
result = self.streams[self.current_stream]
self.lock.release()
return result
def get_cores():
# This function returns the number of available cores
import multiprocessing
return multiprocessing.cpu_count()
def start(stream_pool):
StreamingWorker(stream_pool)
def divide_list(input_list, amount):
# This function divides the whole list into list of lists
result = [[] for _ in range(amount)]
for i in range(len(input_list)):
result[i % len(result)].append(input_list[i])
return result
if __name__ == '__main__':
stream_list = ['rtsp://some/stream1', 'rtsp://some/stream2', 'rtsp://some/stream3']
num_cores = get_cores()
divided_streams = divide_list(stream_list, num_cores)
for streams in divided_streams:
stream_pool = StreamPool(streams)
thread = threading.Thread(target=start, args=(stream_pool))
thread.start()
当我想到这个的时候,我没有考虑到大多数操作都会像这样阻塞操作:
# Getting a frame blocks
_, frame = stream_object['stream'].read()
# Writing to the file system blocks
cv.imwrite(os.path.join('result', stream_object['feed'], '{}.jpg'.format(time.time())))
花太多时间阻塞的问题是浪费了大部分处理能力。我想过使用 ThreadPoolExecutor
的期货,但我似乎无法达到使用尽可能多的处理核心的目标。也许我没有设置足够的线程。
是否有处理阻塞操作的标准方法,以充分利用内核的处理能力?我很高兴有一个与语言无关的答案。
我最终使用 ThreadPoolExecutor
和 add_done_callback(fn)
函数。
class StreamingWorker(object):
def __init__(self, stream_pool):
self.stream_pool = stream_pool
self.thread_pool = ThreadPoolExecutor(10)
self.start_loop()
def start_loop(self):
def done(fn):
print('[info] future done')
def save_image(stream):
# getting an image from the stream
_, frame = stream['stream'].read()
# saving image to file system
cv.imwrite(os.path.join('result', stream['feed'], '{}.jpg'.format(time.time())))
while True:
try:
# getting a stream from the read_strategy
stream_object = self.stream_pool.next()
# Scheduling the process to the thread pool
self.thread_pool.submit(save_image, (stream_object)).add_done_callback(done)
except ValueError as e:
print('[error] {}'.format(e))
我其实并不想在 future 完成后做任何事情,但是如果我使用 result()
那么 while True
就会停止,这也会破坏使用线程池的所有目的.
旁注: 我不得不在调用 self.stream_pool.next()
时添加一个 threading.Rlock()
因为显然 opencv 无法处理来自多个线程的调用。
我正在使用 python 和 OpenCV 从 rtsp 流中获取视频。我从流中获取单帧并将它们保存到文件系统。
我写了一个 StreamingWorker
来处理帧获取和保存。此外还有一个 StreamPool
包含所有流对象。我认为 StreamingWorker
总是 运行,每个核心应该只有一个,以便尽可能多地使用。然后 StreamPool
将 VideoCapture
对象提供给可用的 StreamingWorker
。
问题是脚本 运行 的大部分时间是阻塞的:
import os
import time
import threading
import cv2 as cv
class StreamingWorker(object):
def __init__(self, stream_pool):
self.stream_pool = stream_pool
self.start_loop()
def start_loop(self):
while True:
try:
# getting a stream from the read_strategy
stream_object = self.stream_pool.next()
# getting an image from the stream
_, frame = stream_object['stream'].read()
# saving image to file system
cv.imwrite(os.path.join('result', stream_object['feed'], '{}.jpg'.format(time.time())))
except ValueError as e:
print('[error] {}'.format(e))
class StreamPool(object):
def __init__(self, streams):
self.streams = [{'feed': stream, 'stream': cv.VideoCapture(stream)} for stream in streams]
self.current_stream = 0
self.lock = threading.RLock()
def next(self):
self.lock.acquire()
if(self.current_stream + 1 >= len(self.streams)):
self.current_stream = 0
else:
self.current_stream += 1
result = self.streams[self.current_stream]
self.lock.release()
return result
def get_cores():
# This function returns the number of available cores
import multiprocessing
return multiprocessing.cpu_count()
def start(stream_pool):
StreamingWorker(stream_pool)
def divide_list(input_list, amount):
# This function divides the whole list into list of lists
result = [[] for _ in range(amount)]
for i in range(len(input_list)):
result[i % len(result)].append(input_list[i])
return result
if __name__ == '__main__':
stream_list = ['rtsp://some/stream1', 'rtsp://some/stream2', 'rtsp://some/stream3']
num_cores = get_cores()
divided_streams = divide_list(stream_list, num_cores)
for streams in divided_streams:
stream_pool = StreamPool(streams)
thread = threading.Thread(target=start, args=(stream_pool))
thread.start()
当我想到这个的时候,我没有考虑到大多数操作都会像这样阻塞操作:
# Getting a frame blocks
_, frame = stream_object['stream'].read()
# Writing to the file system blocks
cv.imwrite(os.path.join('result', stream_object['feed'], '{}.jpg'.format(time.time())))
花太多时间阻塞的问题是浪费了大部分处理能力。我想过使用 ThreadPoolExecutor
的期货,但我似乎无法达到使用尽可能多的处理核心的目标。也许我没有设置足够的线程。
是否有处理阻塞操作的标准方法,以充分利用内核的处理能力?我很高兴有一个与语言无关的答案。
我最终使用 ThreadPoolExecutor
和 add_done_callback(fn)
函数。
class StreamingWorker(object):
def __init__(self, stream_pool):
self.stream_pool = stream_pool
self.thread_pool = ThreadPoolExecutor(10)
self.start_loop()
def start_loop(self):
def done(fn):
print('[info] future done')
def save_image(stream):
# getting an image from the stream
_, frame = stream['stream'].read()
# saving image to file system
cv.imwrite(os.path.join('result', stream['feed'], '{}.jpg'.format(time.time())))
while True:
try:
# getting a stream from the read_strategy
stream_object = self.stream_pool.next()
# Scheduling the process to the thread pool
self.thread_pool.submit(save_image, (stream_object)).add_done_callback(done)
except ValueError as e:
print('[error] {}'.format(e))
我其实并不想在 future 完成后做任何事情,但是如果我使用 result()
那么 while True
就会停止,这也会破坏使用线程池的所有目的.
旁注: 我不得不在调用 self.stream_pool.next()
时添加一个 threading.Rlock()
因为显然 opencv 无法处理来自多个线程的调用。