如何使用 Python 多处理队列访问 GPU(通过 PyOpenCL)?
How to use Python multiprocessing queue to access GPU (through PyOpenCL)?
我的代码 运行 需要很长时间,所以我一直在研究 Python 的多处理库以加快处理速度。我的代码还有一些通过 PyOpenCL 使用 GPU 的步骤。问题是,如果我同时将多个进程设置为 运行,它们最终都会同时尝试使用 GPU,这通常会导致一个或多个进程抛出异常并退出.
为了解决这个问题,我错开了每个进程的开始时间,这样它们就不太可能相互碰撞:
process_list = []
num_procs = 4
# break data into chunks so each process gets it's own chunk of the data
data_chunks = chunks(data,num_procs)
for chunk in data_chunks:
if len(chunk) == 0:
continue
# Instantiates the process
p = multiprocessing.Process(target=test, args=(arg1,arg2))
# Sticks the thread in a list so that it remains accessible
process_list.append(p)
# Start threads
j = 1
for process in process_list:
print('\nStarting process %i' % j)
process.start()
time.sleep(5)
j += 1
for process in process_list:
process.join()
我还在调用 GPU 的函数周围包装了一个 try except 循环,这样如果两个进程同时尝试访问它,没有访问权限的那个将等待几秒钟然后尝试再次:
wait = 2
n = 0
while True:
try:
gpu_out = GPU_Obj.GPU_fn(params)
except:
time.sleep(wait)
print('\n Waiting for GPU memory...')
n += 1
if n == 5:
raise Exception('Tried and failed %i times to allocate memory for opencl kernel.' % n)
continue
break
此解决方法非常笨拙,尽管它在大多数情况下都有效,但进程偶尔会抛出异常,我觉得应该有更多 effecient/elegant 使用 multiprocessing.queue
或类似方法的解决方案。但是,我不确定如何将它与 PyOpenCL 集成以进行 GPU 访问。
听起来您可以使用 multiprocessing.Lock
来同步对 GPU 的访问:
data_chunks = chunks(data,num_procs)
lock = multiprocessing.Lock()
for chunk in data_chunks:
if len(chunk) == 0:
continue
# Instantiates the process
p = multiprocessing.Process(target=test, args=(arg1,arg2, lock))
...
然后,在访问 GPU 的 test
内:
with lock: # Only one process will be allowed in this block at a time.
gpu_out = GPU_Obj.GPU_fn(params)
编辑:
要使用池执行此操作,您可以这样做:
# At global scope
lock = None
def init(_lock):
global lock
lock = _lock
data_chunks = chunks(data,num_procs)
lock = multiprocessing.Lock()
for chunk in data_chunks:
if len(chunk) == 0:
continue
# Instantiates the process
p = multiprocessing.Pool(initializer=init, initargs=(lock,))
p.apply(test, args=(arg1, arg2))
...
或:
data_chunks = chunks(data,num_procs)
m = multiprocessing.Manager()
lock = m.Lock()
for chunk in data_chunks:
if len(chunk) == 0:
continue
# Instantiates the process
p = multiprocessing.Pool()
p.apply(test, args=(arg1, arg2, lock))
我的代码 运行 需要很长时间,所以我一直在研究 Python 的多处理库以加快处理速度。我的代码还有一些通过 PyOpenCL 使用 GPU 的步骤。问题是,如果我同时将多个进程设置为 运行,它们最终都会同时尝试使用 GPU,这通常会导致一个或多个进程抛出异常并退出.
为了解决这个问题,我错开了每个进程的开始时间,这样它们就不太可能相互碰撞:
process_list = []
num_procs = 4
# break data into chunks so each process gets it's own chunk of the data
data_chunks = chunks(data,num_procs)
for chunk in data_chunks:
if len(chunk) == 0:
continue
# Instantiates the process
p = multiprocessing.Process(target=test, args=(arg1,arg2))
# Sticks the thread in a list so that it remains accessible
process_list.append(p)
# Start threads
j = 1
for process in process_list:
print('\nStarting process %i' % j)
process.start()
time.sleep(5)
j += 1
for process in process_list:
process.join()
我还在调用 GPU 的函数周围包装了一个 try except 循环,这样如果两个进程同时尝试访问它,没有访问权限的那个将等待几秒钟然后尝试再次:
wait = 2
n = 0
while True:
try:
gpu_out = GPU_Obj.GPU_fn(params)
except:
time.sleep(wait)
print('\n Waiting for GPU memory...')
n += 1
if n == 5:
raise Exception('Tried and failed %i times to allocate memory for opencl kernel.' % n)
continue
break
此解决方法非常笨拙,尽管它在大多数情况下都有效,但进程偶尔会抛出异常,我觉得应该有更多 effecient/elegant 使用 multiprocessing.queue
或类似方法的解决方案。但是,我不确定如何将它与 PyOpenCL 集成以进行 GPU 访问。
听起来您可以使用 multiprocessing.Lock
来同步对 GPU 的访问:
data_chunks = chunks(data,num_procs)
lock = multiprocessing.Lock()
for chunk in data_chunks:
if len(chunk) == 0:
continue
# Instantiates the process
p = multiprocessing.Process(target=test, args=(arg1,arg2, lock))
...
然后,在访问 GPU 的 test
内:
with lock: # Only one process will be allowed in this block at a time.
gpu_out = GPU_Obj.GPU_fn(params)
编辑:
要使用池执行此操作,您可以这样做:
# At global scope
lock = None
def init(_lock):
global lock
lock = _lock
data_chunks = chunks(data,num_procs)
lock = multiprocessing.Lock()
for chunk in data_chunks:
if len(chunk) == 0:
continue
# Instantiates the process
p = multiprocessing.Pool(initializer=init, initargs=(lock,))
p.apply(test, args=(arg1, arg2))
...
或:
data_chunks = chunks(data,num_procs)
m = multiprocessing.Manager()
lock = m.Lock()
for chunk in data_chunks:
if len(chunk) == 0:
continue
# Instantiates the process
p = multiprocessing.Pool()
p.apply(test, args=(arg1, arg2, lock))