"Python threading barrier" 为什么这段代码有效,有没有更好的方法?
"Python threading barrier" Why this code works and is there any better way?
我搜索了python障碍,但相关问题很少。我仍然对 barrier.wait() 感到困惑,即使我的代码有效。
我利用python屏障实现了这样一个功能:一个主线程,n个子线程。在每一轮中,主线程等待所有子线程完成当前工作,然后所有线程进入下一轮,直到满足某些条件。因此,我发现barrier很适合实现这个功能,下面是我的主线程代码。
def superstep(self):
workers = []
barrier = threading.Barrier(self.num_workers+1)
for vertex in self.vertices:
worker = Worker(vertex, barrier)
workers.append(worker)
worker.start()
while self.flag:
barrier.wait()
self.distributeMessages()
self.count += 1
print ("superstep: ", self.count)
self.flag = self.isTerminated()
for worker in workers:
worker.flag = False
for worker in workers:
worker.join()
- 第一个 'for' 循环创建了 n 个名为 worker 的线程并存储在列表 workers 中。
- 'while'循环是主线程等待其他子线程,当self.flag为False时中断
- 第二个'for'循环用于在每个worker(子线程)中将标志设置为False,告诉他们退出循环。
这是我的工人 class。
class Worker(threading.Thread):
def __init__(self, vertex, barrier):
threading.Thread.__init__(self)
self.vertex = vertex
self.flag = True
self.barrier = barrier
def run(self):
while self.flag:
self.barrier.wait()
do something
代码运行良好,所有线程都可以加入()。但是当我查看python barrier时,所有线程都会在所有线程调用wait()时同时释放。如果主线程从它的 while 循环中中断,而所有其他线程都在等待它,在这种情况下,第二个 'for' 循环是无用的,子线程永远不会 join().
那么这段代码是如何工作的,有没有其他方法可以退出障碍而不是引发 BrokenBarrierError? 另外,如果我在第二个中添加一些代码 'for' 循环,打印一些信息什么的,程序就被阻塞了。我猜肯定有一些子线程在 wait() 中,没有机会检查标志,所以它们不能从线程的 运行() 中退出。
如果您不想使用 abort
,您可以在每个线程中调用两次 Barrier.wait
。这会将操作分为两部分。在第一部分中,工作线程将完成它们的工作,而主线程将更新标志状态。然后在第二部分,每个线程都会检查标志状态并在必要时退出循环。
在代码层面上看起来像这样:
# Main
def superstep(self):
workers = []
barrier = threading.Barrier(self.num_workers+1)
for vertex in self.vertices:
worker = Worker(vertex, barrier)
workers.append(worker)
worker.start()
while self.flag:
barrier.wait()
self.distributeMessages()
self.count += 1
print ("superstep: ", self.count)
self.flag = self.isTerminated()
for worker in workers:
worker.flag = self.flag
barrier.wait()
for worker in workers:
worker.join()
# Worker
def run(self):
while self.flag:
self.barrier.wait()
# do something
self.barrier.wait()
您可以拨打
self.barrier.abort()
在第二个for循环后释放等待的worker,并在worker的run()
方法中捕获BrokenBarrierError
。
我搜索了python障碍,但相关问题很少。我仍然对 barrier.wait() 感到困惑,即使我的代码有效。
我利用python屏障实现了这样一个功能:一个主线程,n个子线程。在每一轮中,主线程等待所有子线程完成当前工作,然后所有线程进入下一轮,直到满足某些条件。因此,我发现barrier很适合实现这个功能,下面是我的主线程代码。
def superstep(self):
workers = []
barrier = threading.Barrier(self.num_workers+1)
for vertex in self.vertices:
worker = Worker(vertex, barrier)
workers.append(worker)
worker.start()
while self.flag:
barrier.wait()
self.distributeMessages()
self.count += 1
print ("superstep: ", self.count)
self.flag = self.isTerminated()
for worker in workers:
worker.flag = False
for worker in workers:
worker.join()
- 第一个 'for' 循环创建了 n 个名为 worker 的线程并存储在列表 workers 中。
- 'while'循环是主线程等待其他子线程,当self.flag为False时中断
- 第二个'for'循环用于在每个worker(子线程)中将标志设置为False,告诉他们退出循环。
这是我的工人 class。
class Worker(threading.Thread):
def __init__(self, vertex, barrier):
threading.Thread.__init__(self)
self.vertex = vertex
self.flag = True
self.barrier = barrier
def run(self):
while self.flag:
self.barrier.wait()
do something
代码运行良好,所有线程都可以加入()。但是当我查看python barrier时,所有线程都会在所有线程调用wait()时同时释放。如果主线程从它的 while 循环中中断,而所有其他线程都在等待它,在这种情况下,第二个 'for' 循环是无用的,子线程永远不会 join().
那么这段代码是如何工作的,有没有其他方法可以退出障碍而不是引发 BrokenBarrierError? 另外,如果我在第二个中添加一些代码 'for' 循环,打印一些信息什么的,程序就被阻塞了。我猜肯定有一些子线程在 wait() 中,没有机会检查标志,所以它们不能从线程的 运行() 中退出。
如果您不想使用 abort
,您可以在每个线程中调用两次 Barrier.wait
。这会将操作分为两部分。在第一部分中,工作线程将完成它们的工作,而主线程将更新标志状态。然后在第二部分,每个线程都会检查标志状态并在必要时退出循环。
在代码层面上看起来像这样:
# Main
def superstep(self):
workers = []
barrier = threading.Barrier(self.num_workers+1)
for vertex in self.vertices:
worker = Worker(vertex, barrier)
workers.append(worker)
worker.start()
while self.flag:
barrier.wait()
self.distributeMessages()
self.count += 1
print ("superstep: ", self.count)
self.flag = self.isTerminated()
for worker in workers:
worker.flag = self.flag
barrier.wait()
for worker in workers:
worker.join()
# Worker
def run(self):
while self.flag:
self.barrier.wait()
# do something
self.barrier.wait()
您可以拨打
self.barrier.abort()
在第二个for循环后释放等待的worker,并在worker的run()
方法中捕获BrokenBarrierError
。