使用“thread.join()”时多线程冻结
Multithreading freezes when using `thread.join()`
我正在尝试设置 3 个线程并在队列中执行 5 个任务。这个想法是线程将首先 运行 同时执行前 3 个任务,然后 2 个线程完成剩余的 2 个任务。但是程序似乎冻结了。我无法检测到任何问题。
from multiprocessing import Manager
import threading
import time
global exitFlag
exitFlag = 0
class myThread(threading.Thread):
def __init__(self, threadID, name, q):
threading.Thread.__init__(self)
self.threadID = threadID
self.name = name
self.q = q
def run(self):
print("Starting " + self.name)
process_data(self.name, self.q)
print("Exiting " + self.name)
def process_data(threadName, q):
global exitFlag
while not exitFlag:
if not workQueue.empty():
data = q.get()
print("%s processing %s" % (threadName, data))
else:
pass
time.sleep(1)
print('Nothing to Process')
threadList = ["Thread-1", "Thread-2", "Thread-3"]
nameList = ["One", "Two", "Three", "Four", "Five"]
queueLock = threading.Lock()
workQueue = Manager().Queue(10)
threads = []
threadID = 1
# create thread
for tName in threadList:
thread = myThread(threadID, tName, workQueue)
thread.start()
threads.append(thread)
threadID += 1
# fill up queue
queueLock.acquire()
for word in nameList:
workQueue.put(word)
queueLock.release()
# wait queue clear
while not workQueue.empty():
pass
# notify thread exit
exitFlag = 1
# wait for all threads to finish
for t in threads:
t.join()
print("Exiting Main Thread")
我不知道到底发生了什么,但在我删除 join()
部分后,程序能够 运行 很有趣。我不明白的是 exitFlag 应该在队列清空时发出信号。所以似乎 process_data()
没有检测到信号
您的代码存在多个问题。首先,由于全局解释器锁 (GIL),CPython 中的线程不会 运行 Python 代码 "at the same time"。线程必须持有 GIL 才能执行 Python 字节码。默认情况下,如果一个线程因为阻塞 I/O 而没有更早地丢弃它,它最多会保留 GIL 5 毫秒 (Python 3.2+)。要并行执行 Python 代码,您必须使用 multiprocessing
.
您还不必要地使用 Manager.Queue
而不是 queue.Queue
。 Manager.Queue
是单独管理器进程上的 queue.Queue
。你在这里引入了 IPC 和内存复制的弯路,没有任何好处。
你的死锁的原因是你在这里有一个竞争条件:
if not workQueue.empty():
data = q.get()
这不是原子操作。一个线程可以检查 workQueue.empty()
,然后丢弃 GIL,让另一个线程排空队列,然后继续 data = q.get()
,如果您不在队列中再次放东西,它将永远阻塞。 Queue.empty()
checks 是一个通用的反模式,没有必要使用它。使用毒丸(哨兵值)来打破 get-loop 并让工人知道他们应该退出。您需要与工人数量一样多的哨兵值。找到更多关于 iter(callabel, sentinel)
.
import time
from queue import Queue
from datetime import datetime
from threading import Thread, current_thread
SENTINEL = 'SENTINEL'
class myThread(Thread):
def __init__(self, func, inqueue):
super().__init__()
self.func = func
self._inqueue = inqueue
def run(self):
print(f"{datetime.now()} {current_thread().name} starting")
self.func(self._inqueue)
print(f"{datetime.now()} {current_thread().name} exiting")
def process_data(_inqueue):
for data in iter(_inqueue.get, SENTINEL):
print(f"{datetime.now()} {current_thread().name} "
f"processing {data}")
time.sleep(1)
if __name__ == '__main__':
N_WORKERS = 3
inqueue = Queue()
input_data = ["One", "Two", "Three", "Four", "Five"]
sentinels = [SENTINEL] * N_WORKERS # one sentinel value per worker
# enqueue input and sentinels
for word in input_data + sentinels:
inqueue.put(word)
threads = [myThread(process_data, inqueue) for _ in range(N_WORKERS)]
for t in threads:
t.start()
for t in threads:
t.join()
print(f"{datetime.now()} {current_thread().name} exiting")
示例输出:
2019-02-14 17:58:18.265208 Thread-1 starting
2019-02-14 17:58:18.265277 Thread-1 processing One
2019-02-14 17:58:18.265472 Thread-2 starting
2019-02-14 17:58:18.265542 Thread-2 processing Two
2019-02-14 17:58:18.265691 Thread-3 starting
2019-02-14 17:58:18.265793 Thread-3 processing Three
2019-02-14 17:58:19.266417 Thread-1 processing Four
2019-02-14 17:58:19.266632 Thread-2 processing Five
2019-02-14 17:58:19.266767 Thread-3 exiting
2019-02-14 17:58:20.267588 Thread-1 exiting
2019-02-14 17:58:20.267861 Thread-2 exiting
2019-02-14 17:58:20.267994 MainThread exiting
Process finished with exit code 0
如果你不坚持继承Thread
,你也可以只使用multiprocessing.pool.ThreadPool
a.k.a。 multiprocessing.dummy.Pool
它在后台为你做管道。
我正在尝试设置 3 个线程并在队列中执行 5 个任务。这个想法是线程将首先 运行 同时执行前 3 个任务,然后 2 个线程完成剩余的 2 个任务。但是程序似乎冻结了。我无法检测到任何问题。
from multiprocessing import Manager
import threading
import time
global exitFlag
exitFlag = 0
class myThread(threading.Thread):
def __init__(self, threadID, name, q):
threading.Thread.__init__(self)
self.threadID = threadID
self.name = name
self.q = q
def run(self):
print("Starting " + self.name)
process_data(self.name, self.q)
print("Exiting " + self.name)
def process_data(threadName, q):
global exitFlag
while not exitFlag:
if not workQueue.empty():
data = q.get()
print("%s processing %s" % (threadName, data))
else:
pass
time.sleep(1)
print('Nothing to Process')
threadList = ["Thread-1", "Thread-2", "Thread-3"]
nameList = ["One", "Two", "Three", "Four", "Five"]
queueLock = threading.Lock()
workQueue = Manager().Queue(10)
threads = []
threadID = 1
# create thread
for tName in threadList:
thread = myThread(threadID, tName, workQueue)
thread.start()
threads.append(thread)
threadID += 1
# fill up queue
queueLock.acquire()
for word in nameList:
workQueue.put(word)
queueLock.release()
# wait queue clear
while not workQueue.empty():
pass
# notify thread exit
exitFlag = 1
# wait for all threads to finish
for t in threads:
t.join()
print("Exiting Main Thread")
我不知道到底发生了什么,但在我删除 join()
部分后,程序能够 运行 很有趣。我不明白的是 exitFlag 应该在队列清空时发出信号。所以似乎 process_data()
您的代码存在多个问题。首先,由于全局解释器锁 (GIL),CPython 中的线程不会 运行 Python 代码 "at the same time"。线程必须持有 GIL 才能执行 Python 字节码。默认情况下,如果一个线程因为阻塞 I/O 而没有更早地丢弃它,它最多会保留 GIL 5 毫秒 (Python 3.2+)。要并行执行 Python 代码,您必须使用 multiprocessing
.
您还不必要地使用 Manager.Queue
而不是 queue.Queue
。 Manager.Queue
是单独管理器进程上的 queue.Queue
。你在这里引入了 IPC 和内存复制的弯路,没有任何好处。
你的死锁的原因是你在这里有一个竞争条件:
if not workQueue.empty():
data = q.get()
这不是原子操作。一个线程可以检查 workQueue.empty()
,然后丢弃 GIL,让另一个线程排空队列,然后继续 data = q.get()
,如果您不在队列中再次放东西,它将永远阻塞。 Queue.empty()
checks 是一个通用的反模式,没有必要使用它。使用毒丸(哨兵值)来打破 get-loop 并让工人知道他们应该退出。您需要与工人数量一样多的哨兵值。找到更多关于 iter(callabel, sentinel)
import time
from queue import Queue
from datetime import datetime
from threading import Thread, current_thread
SENTINEL = 'SENTINEL'
class myThread(Thread):
def __init__(self, func, inqueue):
super().__init__()
self.func = func
self._inqueue = inqueue
def run(self):
print(f"{datetime.now()} {current_thread().name} starting")
self.func(self._inqueue)
print(f"{datetime.now()} {current_thread().name} exiting")
def process_data(_inqueue):
for data in iter(_inqueue.get, SENTINEL):
print(f"{datetime.now()} {current_thread().name} "
f"processing {data}")
time.sleep(1)
if __name__ == '__main__':
N_WORKERS = 3
inqueue = Queue()
input_data = ["One", "Two", "Three", "Four", "Five"]
sentinels = [SENTINEL] * N_WORKERS # one sentinel value per worker
# enqueue input and sentinels
for word in input_data + sentinels:
inqueue.put(word)
threads = [myThread(process_data, inqueue) for _ in range(N_WORKERS)]
for t in threads:
t.start()
for t in threads:
t.join()
print(f"{datetime.now()} {current_thread().name} exiting")
示例输出:
2019-02-14 17:58:18.265208 Thread-1 starting
2019-02-14 17:58:18.265277 Thread-1 processing One
2019-02-14 17:58:18.265472 Thread-2 starting
2019-02-14 17:58:18.265542 Thread-2 processing Two
2019-02-14 17:58:18.265691 Thread-3 starting
2019-02-14 17:58:18.265793 Thread-3 processing Three
2019-02-14 17:58:19.266417 Thread-1 processing Four
2019-02-14 17:58:19.266632 Thread-2 processing Five
2019-02-14 17:58:19.266767 Thread-3 exiting
2019-02-14 17:58:20.267588 Thread-1 exiting
2019-02-14 17:58:20.267861 Thread-2 exiting
2019-02-14 17:58:20.267994 MainThread exiting
Process finished with exit code 0
如果你不坚持继承Thread
,你也可以只使用multiprocessing.pool.ThreadPool
a.k.a。 multiprocessing.dummy.Pool
它在后台为你做管道。