队列中的所有任务已完成,但程序未继续

All tasks in Queue done, but program not continuing

我有一个线程 class 定义如下:

#!/usr/bin/python

import threading
import subprocess

class PingThread (threading.Thread):
    ipstatus = ''
    def __init__(self, ip):
        threading.Thread.__init__(self)
        self.ipaddress = ip


    def ping(self, ip):
        print 'Pinging ' + ip + '...'
        ping_response = subprocess.Popen(["ping", "-c", "1", ip], stdout=subprocess.PIPE).stdout.read()
        if '100.0% packet loss' not in str(ping_response):
            return True
        return False

    def set_ip_status(self, status):
        self.ipstatus = status

    def get_ip_status(self):
        return self.ipstatus

    def run(self):
        self.ipaddress = self.ipaddress.strip('\n\t')
        pingResponse = self.ping(self.ipaddress)
        if pingResponse:
            self.set_ip_status(self.ipaddress + ' is up!')
        else:
            self.set_ip_status(self.ipaddress + ' is down!')

我正在查看一个 IP 地址列表并将其发送到 PingThread 并让这个 class ping IP 地址。当这些线程都完成后,我希望它通过并通过调用 get_ip_status() 获取每个线程的状态。我的代码中有 q.join(),它应该等到队列中的所有项目都完成(根据我的理解,如果我错了,请纠正我,仍然是线程的新手)但是我的代码永远不会通过 q.join。我测试过,所有线程都完成了,所有 ip 地址都被 ping 了,但是 q.join() 没有识别出来。为什么是这样?我究竟做错了什么?我正在创建这样的线程:

q = Queue.Queue()
for ip in trainips:
    thread = PingThread(ip)
    thread.start()
    q.put(thread)
q.join()
while not q.empty():
    print q.get().get_ip_status()

您误解了 Queue.join 的工作原理。 Queue.join 旨在与 Queue.task_done 一起使用;在生产者端,您将 put 项放入 Queue 中,然后调用 Queue.join 等待您 put 处理的所有项。然后在消费者端,您 get 来自 Queue 的项目,对其进行处理,然后在完成后调用 Queue.task_done。一旦为 put 中的所有项目调用 task_done 进入 QueueQueue.join 将解锁。

但你没有那样做。您只是启动了一堆线程,将它们添加到 Queue,然后在其上调用 join。您根本没有使用 task_done,您只是在 Queue.join 之后调用 Queue.get,看起来您只是在使用它来获取线程对象之后完全的。但事实并非如此。 Queue 不知道里面有 Thread 个对象,简单地调用 Queue.join 不会等待里面的 Thread 个对象完成。

真的,看起来您需要做的就是将线程放入列表中,然后在每个线程上调用 join

threads = []
for ip in trainips:
    thread = PingThread(ip)
    thread.start()
    threads.append(thread)
for thread in threads:
    thread.join()
    print thread.get_ip_status()

正如文档所说,Queue.join

Blocks until all items in the queue have been gotten and processed.

但是在 join 之前,您不会每次都尝试获取这些项目(即便如此,您也不会将它们标记为已处理)。

因此,在完成 while 循环之前,您无法通过 join,而在通过 join 之前,您无法通过 join,因此您永远阻止。

要使 join 起作用,您必须将最后三行更改为:

while not q.empty():
    print q.get().get_ip_status()
    q.task_done()
q.join()

但是,更简单的解决方案是不 join 队列。相反,您 可以 join 所有线程;那么您就知道 get 所有值都是安全的。但请注意,如果您这样做,队列就没有理由成为 Queue;它可以只是一个普通的旧 list。在这一点上你实际上得到了 .

或者,您可以更改代码以实际使用队列。不是将线程放入队列中,而是将队列传递给线程函数,并将其结果放入队列中,而不是将其存储为属性。然后,你可以像你一样循环遍历 get() ,它会自动处理你需要的所有阻塞。文档中 Queue.join 的示例显示了如何几乎完全按照您的意愿去做。

后一种解决方案的优点是您不再需要任务和线程一对一映射——例如,使用 16 个线程池 运行 128 个任务,您仍然将在队列中得到 128 个值。*


* 但如果你想这样做,你可能会使用 multiprocessing.dummy.Pool 或(来自 PyPI 上的 concurrent.futures backport) futures.ThreadPoolExecutor 而不是构建自己做。