"Python threading barrier" 为什么这段代码有效,有没有更好的方法?

"Python threading barrier" Why this code works and is there any better way?

我搜索了python障碍,但相关问题很少。我仍然对 barrier.wait() 感到困惑,即使我的代码有效。

我利用python屏障实现了这样一个功能:一个主线程,n个子线程。在每一轮中,主线程等待所有子线程完成当前工作,然后所有线程进入下一轮,直到满足某些条件。因此,我发现barrier很适合实现这个功能,下面是我的主线程代码。

 def superstep(self):
    workers = []
    barrier = threading.Barrier(self.num_workers+1)
    for vertex in self.vertices:
        worker = Worker(vertex, barrier)
        workers.append(worker)
        worker.start()

    while self.flag:
        barrier.wait()
        self.distributeMessages()
        self.count += 1
        print ("superstep: ", self.count)
        self.flag = self.isTerminated()

    for worker in workers:
        worker.flag = False

    for worker in workers:
        worker.join()
  1. 第一个 'for' 循环创建了 n 个名为 worker 的线程并存储在列表 workers 中。
  2. 'while'循环是主线程等待其他子线程,当self.flag为False时中断
  3. 第二个'for'循环用于在每个worker(子线程)中将标志设置为False,告诉他们退出循环。

这是我的工人 class。

class Worker(threading.Thread):
    def __init__(self, vertex, barrier):
        threading.Thread.__init__(self)
        self.vertex = vertex
        self.flag = True
        self.barrier = barrier

    def run(self):
        while self.flag:
            self.barrier.wait()
            do something

代码运行良好,所有线程都可以加入()。但是当我查看python barrier时,所有线程都会在所有线程调用wait()时同时释放。如果主线程从它的 while 循环中中断,而所有其他线程都在等待它,在这种情况下,第二个 'for' 循环是无用的,子线程永远不会 join().

那么这段代码是如何工作的,有没有其他方法可以退出障碍而不是引发 BrokenBarrierError? 另外,如果我在第二个中添加一些代码 'for' 循环,打印一些信息什么的,程序就被阻塞了。我猜肯定有一些子线程在 wait() 中,没有机会检查标志,所以它们不能从线程的 运行() 中退出。

如果您不想使用 abort,您可以在每个线程中调用两次 Barrier.wait。这会将操作分为两部分。在第一部分中,工作线程将完成它们的工作,而主线程将更新标志状态。然后在第二部分,每个线程都会检查标志状态并在必要时退出循环。

在代码层面上看起来像这样:

# Main
def superstep(self):
    workers = []
    barrier = threading.Barrier(self.num_workers+1)
    for vertex in self.vertices:
        worker = Worker(vertex, barrier)
        workers.append(worker)
        worker.start()

    while self.flag:
        barrier.wait()
        self.distributeMessages()
        self.count += 1
        print ("superstep: ", self.count)
        self.flag = self.isTerminated()
        for worker in workers:
            worker.flag = self.flag
        barrier.wait()

    for worker in workers:
        worker.join()

# Worker
def run(self):
    while self.flag:
        self.barrier.wait()
        # do something
        self.barrier.wait() 

您可以拨打

self.barrier.abort()

在第二个for循环后释放等待的worker,并在worker的run()方法中捕获BrokenBarrierError