在多线程生产者-消费者模式下,如何让工作线程在工作完成后退出?

How to make worker threads quit after work is finished in a multithreaded producer-consumer pattern?

我正在尝试使用以下方法实现多线程生产者-消费者模式 Queue.Queue 在 Python 2.7。我想弄清楚如何制作 消费者,即工作线程,在完成所有必需的工作后停止。

请参阅 Martin James 对此答案的第二条评论:

Send an 'I am finished' task, instructing the pool threads to terminate. Any thread that gets such a task requeues it and then commits suicide.

但这对我不起作用。例如,请参见以下代码。

import Queue
import threading
import time

def worker(n, q):
    # n - Worker ID
    # q - Queue from which to receive data
    while True:
        data = q.get()
        print 'worker', n, 'got', data
        time.sleep(1)  # Simulate noticeable data processing time
        q.task_done()
        if data == -1: # -1 is used to indicate that the worker should stop
            # Requeue the exit indicator.
            q.put(-1)
            # Commit suicide.
            print 'worker', n, 'is exiting'
            break

def master():
    # master() sends data to worker() via q.
    q = Queue.Queue()

    # Create 3 workers.
    for i in range(3):
        t = threading.Thread(target=worker, args=(i, q))
        t.start()

    # Send 10 items to work on.
    for i in range(10):
        q.put(i)
        time.sleep(0.5)

    # Send an exit indicator for all threads to consume.
    q.put(-1)

    print 'waiting for workers to finish ...'
    q.join()
    print 'done'

master()

这个程序在所有三个工人都读取了退出指示器后挂起, 即 -1 来自队列,因为每个工作人员在 -1 之前重新排队 退出,因此队列永远不会变空并且 q.join() 永远不会 returns.

我想出了以下但丑陋的解决方案,我发送了一个 -1 退出 通过队列为每个工作人员提供指示器,以便每个工作人员都可以看到它 并自杀。但事实上我必须发送一个退出指示器 为每个工人都觉得有点丑。

import Queue
import threading
import time

def worker(n, q):
    # n - Worker ID
    # q - Queue from which to receive data
    while True:
        data = q.get()
        print 'worker', n, 'got', data
        time.sleep(1)  # Simulate noticeable data processing time
        q.task_done()
        if data == -1: # -1 is used to indicate that the worker should stop
            print 'worker', n, 'is exiting'
            break

def master():
    # master() sends data to worker() via q.
    q = Queue.Queue()

    # Create 3 workers.
    for i in range(3):
        t = threading.Thread(target=worker, args=(i, q))
        t.start()

    # Send 10 items to work on.
    for i in range(10):
        q.put(i)
        time.sleep(0.5)

    # Send one stop indicator for each worker.
    for i in range(3):
        q.put(-1)

    print 'waiting for workers to finish ...'
    q.join()
    print 'done'

master()

我有两个问题。

  1. 为所有线程发送单个退出指示符的方法(如 Martin James 在 的第二条评论中所解释的那样)是否可行?
  2. 如果上一个问题的答案是"No",有没有一种方法可以让我不必为每个工作线程发送单独的退出指示符来解决问题?

为所有线程发送单个退出指示符的方法(如 Martin James 在 的第二条评论中所解释的那样)是否可行?

如您所见,它无法工作,传播消息将使最后一个线程用另一个项目更新队列,因为您正在等待一个永远不会为空的队列,而不是您的代码有。

如果上一个问题的答案是"No",有没有办法解决这个问题,我不必为每个工人发送单独的退出指示器线程?

您可以 join 线程而不是队列:

def worker(n, q):
    # n - Worker ID
    # q - Queue from which to receive data
    while True:
        data = q.get()
        print 'worker', n, 'got', data
        time.sleep(1)  # Simulate noticeable data processing time
        q.task_done()
        if data == -1: # -1 is used to indicate that the worker should stop
            # Requeue the exit indicator.
            q.put(-1)
            # Commit suicide.
            print 'worker', n, 'is exiting'
            break

def master():
    # master() sends data to worker() via q.
    q = Queue.Queue()

    # Create 3 workers.
    threads = [threading.Thread(target=worker, args=(i, q)) for i in range(3)]
    for t in threads:
        threads.start()
    # Send 10 items to work on.
    for i in range(10):
        q.put(i)
        time.sleep(0.5)

    # Send an exit indicator for all threads to consume.
    q.put(-1)

    print 'waiting for workers to finish ...'
    for t in threads:
        t.join()
    print 'done'

master()

因为 Queue documentation explain get 方法一旦为空就会引发一个 execption,所以如果您已经知道要处理的数据,您可以填充队列然后向线程发送垃圾邮件:

import Queue
import threading
import time

def worker(n, q):
    # n - Worker ID
    # q - Queue from which to receive data
    while True:
        try:
            data = q.get(block=False, timeout=1)
            print 'worker', n, 'got', data
            time.sleep(1)  # Simulate noticeable data processing time
            q.task_done()
        except Queue.Empty:
            break


def master():
    # master() sends data to worker() via q.
    q = Queue.Queue()

    # Send 10 items to work on.
    for i in range(10):
        q.put(i)

    # Create 3 workers.
    for i in range(3):
        t = threading.Thread(target=worker, args=(i, q))
        t.start()

    print 'waiting for workers to finish ...'
    q.join()
    print 'done'

master()

这里有一个live example

不要将其称为任务的特例。

改为使用 Event,为您的工作人员实现非阻塞。

stopping = threading.Event()

def worker(n, q, timeout=1):
    # run until the master thread indicates we're done
    while not stopping.is_set():
        try:
            # don't block indefinitely so we can return to the top
            # of the loop and check the stopping event
            data = q.get(True, timeout)
        # raised by q.get if we reach the timeout on an empty queue
        except queue.Empty:
            continue
        q.task_done()

def master():
    ...

    print 'waiting for workers to finish'
    q.join()
    stopping.set()
    print 'done'

In addition to @DanielSanchez excellent answer, I propose to actually rely on a similar mechanism as a Java CountDownLatch.

要点是,

  • 您创建了一个 latch,它将 仅在某个计数器下降后才打开
  • 当闩锁打开时,等待它的线程将被允许继续执行它们。

  • 我做了一个过于简单的例子,检查 here 是否有 class 这样的锁存器的例子:

    import threading
    import Queue
    import time
    
    WORKER_COUNT = 3
    latch = threading.Condition()
    count = 3
    
    def wait():
        latch.acquire()
        while count > 0:
            latch.wait()
        latch.release()
    
    def count_down():
        global count
        latch.acquire()
        count -= 1
        if count <= 0:
            latch.notify_all()
        latch.release()
    
    def worker(n, q):
        # n - Worker ID
        # q - Queue from which to receive data
        while True:
            data = q.get()
            print 'worker', n, 'got', data
            time.sleep(1)  # Simulate noticeable data processing time
            q.task_done()
            if data == -1: # -1 is used to indicate that the worker should stop
                # Requeue the exit indicator.
                q.put(-1)
                # Commit suicide.
                count_down()
                print 'worker', n, 'is exiting'
                break
    
    # master() sends data to worker() via q.  
    
    def master():
        q = Queue.Queue()
    
        # Create 3 workers.
        for i in range(WORKER_COUNT):
            t = threading.Thread(target=worker, args=(i, q))
            t.start()
    
        # Send 10 items to work on.
        for i in range(10):
            q.put(i)
            time.sleep(0.5)
    
        # Send an exit indicator for all threads to consume.
        q.put(-1)
        wait()
        print 'done'
    
    master()
    

为了完整起见: 您还可以排队一个停止信号,它是 -(线程数)。 然后每个线程都可以将其递增 1 并仅当停止信号为 != 0 时才将其重新排队。

    if data < 0: # negative numbers are used to indicate that the worker should stop
        if data < -1:
            q.put(data + 1)
        # Commit suicide.
        print 'worker', n, 'is exiting'
        break

但我个人会选择 Travis Mehlinger Daniel Sanchez 答案。