队列中的所有任务已完成,但程序未继续
All tasks in Queue done, but program not continuing
我有一个线程 class 定义如下:
#!/usr/bin/python
import threading
import subprocess
class PingThread (threading.Thread):
ipstatus = ''
def __init__(self, ip):
threading.Thread.__init__(self)
self.ipaddress = ip
def ping(self, ip):
print 'Pinging ' + ip + '...'
ping_response = subprocess.Popen(["ping", "-c", "1", ip], stdout=subprocess.PIPE).stdout.read()
if '100.0% packet loss' not in str(ping_response):
return True
return False
def set_ip_status(self, status):
self.ipstatus = status
def get_ip_status(self):
return self.ipstatus
def run(self):
self.ipaddress = self.ipaddress.strip('\n\t')
pingResponse = self.ping(self.ipaddress)
if pingResponse:
self.set_ip_status(self.ipaddress + ' is up!')
else:
self.set_ip_status(self.ipaddress + ' is down!')
我正在查看一个 IP 地址列表并将其发送到 PingThread
并让这个 class ping IP 地址。当这些线程都完成后,我希望它通过并通过调用 get_ip_status()
获取每个线程的状态。我的代码中有 q.join()
,它应该等到队列中的所有项目都完成(根据我的理解,如果我错了,请纠正我,仍然是线程的新手)但是我的代码永远不会通过 q.join
。我测试过,所有线程都完成了,所有 ip 地址都被 ping 了,但是 q.join()
没有识别出来。为什么是这样?我究竟做错了什么?我正在创建这样的线程:
q = Queue.Queue()
for ip in trainips:
thread = PingThread(ip)
thread.start()
q.put(thread)
q.join()
while not q.empty():
print q.get().get_ip_status()
您误解了 Queue.join
的工作原理。 Queue.join
旨在与 Queue.task_done
一起使用;在生产者端,您将 put
项放入 Queue
中,然后调用 Queue.join
等待您 put
处理的所有项。然后在消费者端,您 get
来自 Queue
的项目,对其进行处理,然后在完成后调用 Queue.task_done
。一旦为 put
中的所有项目调用 task_done
进入 Queue
,Queue.join
将解锁。
但你没有那样做。您只是启动了一堆线程,将它们添加到 Queue
,然后在其上调用 join
。您根本没有使用 task_done
,您只是在 Queue.join
之后调用 Queue.get
,看起来您只是在使用它来获取线程对象之后完全的。但事实并非如此。 Queue
不知道里面有 Thread
个对象,简单地调用 Queue.join
不会等待里面的 Thread
个对象完成。
真的,看起来您需要做的就是将线程放入列表中,然后在每个线程上调用 join
。
threads = []
for ip in trainips:
thread = PingThread(ip)
thread.start()
threads.append(thread)
for thread in threads:
thread.join()
print thread.get_ip_status()
正如文档所说,Queue.join
Blocks until all items in the queue have been gotten and processed.
但是在 join
之前,您不会每次都尝试获取这些项目(即便如此,您也不会将它们标记为已处理)。
因此,在完成 while
循环之前,您无法通过 join
,而在通过 join
之前,您无法通过 join
,因此您永远阻止。
要使 join
起作用,您必须将最后三行更改为:
while not q.empty():
print q.get().get_ip_status()
q.task_done()
q.join()
但是,更简单的解决方案是不 join
队列。相反,您 可以 join
所有线程;那么您就知道 get
所有值都是安全的。但请注意,如果您这样做,队列就没有理由成为 Queue
;它可以只是一个普通的旧 list
。在这一点上你实际上得到了 .
或者,您可以更改代码以实际使用队列。不是将线程放入队列中,而是将队列传递给线程函数,并将其结果放入队列中,而不是将其存储为属性。然后,你可以像你一样循环遍历 get()
,它会自动处理你需要的所有阻塞。文档中 Queue.join
的示例显示了如何几乎完全按照您的意愿去做。
后一种解决方案的优点是您不再需要任务和线程一对一映射——例如,使用 16 个线程池 运行 128 个任务,您仍然将在队列中得到 128 个值。*
* 但如果你想这样做,你可能会使用 multiprocessing.dummy.Pool
或(来自 PyPI 上的 concurrent.futures
backport) futures.ThreadPoolExecutor
而不是构建自己做。
我有一个线程 class 定义如下:
#!/usr/bin/python
import threading
import subprocess
class PingThread (threading.Thread):
ipstatus = ''
def __init__(self, ip):
threading.Thread.__init__(self)
self.ipaddress = ip
def ping(self, ip):
print 'Pinging ' + ip + '...'
ping_response = subprocess.Popen(["ping", "-c", "1", ip], stdout=subprocess.PIPE).stdout.read()
if '100.0% packet loss' not in str(ping_response):
return True
return False
def set_ip_status(self, status):
self.ipstatus = status
def get_ip_status(self):
return self.ipstatus
def run(self):
self.ipaddress = self.ipaddress.strip('\n\t')
pingResponse = self.ping(self.ipaddress)
if pingResponse:
self.set_ip_status(self.ipaddress + ' is up!')
else:
self.set_ip_status(self.ipaddress + ' is down!')
我正在查看一个 IP 地址列表并将其发送到 PingThread
并让这个 class ping IP 地址。当这些线程都完成后,我希望它通过并通过调用 get_ip_status()
获取每个线程的状态。我的代码中有 q.join()
,它应该等到队列中的所有项目都完成(根据我的理解,如果我错了,请纠正我,仍然是线程的新手)但是我的代码永远不会通过 q.join
。我测试过,所有线程都完成了,所有 ip 地址都被 ping 了,但是 q.join()
没有识别出来。为什么是这样?我究竟做错了什么?我正在创建这样的线程:
q = Queue.Queue()
for ip in trainips:
thread = PingThread(ip)
thread.start()
q.put(thread)
q.join()
while not q.empty():
print q.get().get_ip_status()
您误解了 Queue.join
的工作原理。 Queue.join
旨在与 Queue.task_done
一起使用;在生产者端,您将 put
项放入 Queue
中,然后调用 Queue.join
等待您 put
处理的所有项。然后在消费者端,您 get
来自 Queue
的项目,对其进行处理,然后在完成后调用 Queue.task_done
。一旦为 put
中的所有项目调用 task_done
进入 Queue
,Queue.join
将解锁。
但你没有那样做。您只是启动了一堆线程,将它们添加到 Queue
,然后在其上调用 join
。您根本没有使用 task_done
,您只是在 Queue.join
之后调用 Queue.get
,看起来您只是在使用它来获取线程对象之后完全的。但事实并非如此。 Queue
不知道里面有 Thread
个对象,简单地调用 Queue.join
不会等待里面的 Thread
个对象完成。
真的,看起来您需要做的就是将线程放入列表中,然后在每个线程上调用 join
。
threads = []
for ip in trainips:
thread = PingThread(ip)
thread.start()
threads.append(thread)
for thread in threads:
thread.join()
print thread.get_ip_status()
正如文档所说,Queue.join
Blocks until all items in the queue have been gotten and processed.
但是在 join
之前,您不会每次都尝试获取这些项目(即便如此,您也不会将它们标记为已处理)。
因此,在完成 while
循环之前,您无法通过 join
,而在通过 join
之前,您无法通过 join
,因此您永远阻止。
要使 join
起作用,您必须将最后三行更改为:
while not q.empty():
print q.get().get_ip_status()
q.task_done()
q.join()
但是,更简单的解决方案是不 join
队列。相反,您 可以 join
所有线程;那么您就知道 get
所有值都是安全的。但请注意,如果您这样做,队列就没有理由成为 Queue
;它可以只是一个普通的旧 list
。在这一点上你实际上得到了
或者,您可以更改代码以实际使用队列。不是将线程放入队列中,而是将队列传递给线程函数,并将其结果放入队列中,而不是将其存储为属性。然后,你可以像你一样循环遍历 get()
,它会自动处理你需要的所有阻塞。文档中 Queue.join
的示例显示了如何几乎完全按照您的意愿去做。
后一种解决方案的优点是您不再需要任务和线程一对一映射——例如,使用 16 个线程池 运行 128 个任务,您仍然将在队列中得到 128 个值。*
* 但如果你想这样做,你可能会使用 multiprocessing.dummy.Pool
或(来自 PyPI 上的 concurrent.futures
backport) futures.ThreadPoolExecutor
而不是构建自己做。