Python 锁定临界区
Python Locking Critical Section
我正在尝试使用 Python 中的多处理库来同时处理 "tests"。我有一个测试列表存储在变量 test_files
中。我想让工作人员从 test_files
中删除一个测试并调用它们的 process_test
函数。然而,当我 运行 这段代码时,两个过程 运行 相同的测试。看来我没有以线程安全的方式访问 test_files
。我做错了什么?
代码
def process_worker(lock, test_files)
# Keep going until we run out of tests
while True:
test_file = None
# Critical section of code
lock.acquire()
try:
if len(test_files) != 0:
test_file = test_files.pop()
finally:
lock.release()
# End critical section of code
# If there is another test in the queue process it
if test_file is not None:
print "Running test {0} on worker {1}".format(test_file, multiprocessing.current_process().name)
process_test(test_file)
else:
# No more tests to process
return
# Mutex for workers
lock = multiprocessing.Lock()
# Declare our workers
p1 = multiprocessing.Process(target = process_worker, name = "Process 1", args=(lock, test_files))
p2 = multiprocessing.Process(target = process_worker, name = "Process 2", args=(lock, test_files))
# Start processing
p1.start()
p2.start()
# Block until both workers finish
p1.join()
p2.join()
输出
Running test "BIT_Test" on worker Process 1
Running test "BIT_Test" on worker Process 2
尝试像这样分享列表不是正确的方法。您应该使用进程安全的数据结构,例如 multiprocessing.Queue
,或者更好的是,使用 multiprocessing.Pool
and let it handle the queuing for you. What you're doing is perfectly suited for Pool.map
:
import multiprocessing
def process_worker(test_file):
print "Running test {0} on worker {1}".format(test_file, multiprocessing.current_process().name)
process_test(test_file)
p = multiprocessing.Pool(2) # 2 processes in the pool
# map puts each item from test_files in a Queue, lets the
# two processes in our pool pull each item from the Queue,
# and then execute process_worker with that item as an argument.
p.map(process_worker, test_files)
p.close()
p.join()
简单多了!
您也可以使用 multiprocessing.Manager
import multiprocessing
def process_worker(lock, test_files):
# Keep going until we run out of tests
while True:
test_file = None
# Critical section of code
lock.acquire()
try:
if len(test_files) != 0:
test_file = test_files.pop()
finally:
lock.release()
# End critical section of code
# If there is another test in the queue process it
if test_file is not None:
print "Running test %s on worker %s" % (test_file, multiprocessing.current_process().name)
#process_test(test_file)
else:
# No more tests to process
return
# Mutex for workers
lock = multiprocessing.Lock()
manager = multiprocessing.Manager()
test_files = manager.list(['f1', 'f2', 'f3'])
# Declare our workers
p1 = multiprocessing.Process(target = process_worker, name = "Process 1", args=(lock, test_files))
p2 = multiprocessing.Process(target = process_worker, name = "Process 2", args=(lock, test_files))
# Start processing
p1.start()
p2.start()
# Block until both workers finish
p1.join()
p2.join()
我正在尝试使用 Python 中的多处理库来同时处理 "tests"。我有一个测试列表存储在变量 test_files
中。我想让工作人员从 test_files
中删除一个测试并调用它们的 process_test
函数。然而,当我 运行 这段代码时,两个过程 运行 相同的测试。看来我没有以线程安全的方式访问 test_files
。我做错了什么?
代码
def process_worker(lock, test_files)
# Keep going until we run out of tests
while True:
test_file = None
# Critical section of code
lock.acquire()
try:
if len(test_files) != 0:
test_file = test_files.pop()
finally:
lock.release()
# End critical section of code
# If there is another test in the queue process it
if test_file is not None:
print "Running test {0} on worker {1}".format(test_file, multiprocessing.current_process().name)
process_test(test_file)
else:
# No more tests to process
return
# Mutex for workers
lock = multiprocessing.Lock()
# Declare our workers
p1 = multiprocessing.Process(target = process_worker, name = "Process 1", args=(lock, test_files))
p2 = multiprocessing.Process(target = process_worker, name = "Process 2", args=(lock, test_files))
# Start processing
p1.start()
p2.start()
# Block until both workers finish
p1.join()
p2.join()
输出
Running test "BIT_Test" on worker Process 1
Running test "BIT_Test" on worker Process 2
尝试像这样分享列表不是正确的方法。您应该使用进程安全的数据结构,例如 multiprocessing.Queue
,或者更好的是,使用 multiprocessing.Pool
and let it handle the queuing for you. What you're doing is perfectly suited for Pool.map
:
import multiprocessing
def process_worker(test_file):
print "Running test {0} on worker {1}".format(test_file, multiprocessing.current_process().name)
process_test(test_file)
p = multiprocessing.Pool(2) # 2 processes in the pool
# map puts each item from test_files in a Queue, lets the
# two processes in our pool pull each item from the Queue,
# and then execute process_worker with that item as an argument.
p.map(process_worker, test_files)
p.close()
p.join()
简单多了!
您也可以使用 multiprocessing.Manager
import multiprocessing
def process_worker(lock, test_files):
# Keep going until we run out of tests
while True:
test_file = None
# Critical section of code
lock.acquire()
try:
if len(test_files) != 0:
test_file = test_files.pop()
finally:
lock.release()
# End critical section of code
# If there is another test in the queue process it
if test_file is not None:
print "Running test %s on worker %s" % (test_file, multiprocessing.current_process().name)
#process_test(test_file)
else:
# No more tests to process
return
# Mutex for workers
lock = multiprocessing.Lock()
manager = multiprocessing.Manager()
test_files = manager.list(['f1', 'f2', 'f3'])
# Declare our workers
p1 = multiprocessing.Process(target = process_worker, name = "Process 1", args=(lock, test_files))
p2 = multiprocessing.Process(target = process_worker, name = "Process 2", args=(lock, test_files))
# Start processing
p1.start()
p2.start()
# Block until both workers finish
p1.join()
p2.join()