避免等待线程在 Python 内完成
Avoid waiting for threads to finish in Python
我在这里写了这个脚本来从 txt
文件中读取数据并处理它。但似乎如果我给它一个大文件和大量线程,它从列表中读取的越多,脚本就越慢。
有没有办法避免等待所有线程完成并在线程完成工作时启动一个新线程?
另外,当它完成处理时,脚本似乎并没有退出。
import threading, Queue, time
class Work(threading.Thread):
def __init__(self, jobs):
threading.Thread.__init__(self)
self.Lock = threading.Lock()
self.jobs = jobs
def myFunction(self):
#simulate work
self.Lock.acquire()
print("Firstname: "+ self.firstname + " Lastname: "+ self.lastname)
self.Lock.release()
time.sleep(3)
def run(self):
while True:
self.item = self.jobs.get().rstrip()
self.firstname = self.item.split(":")[0]
self.lastname = self.item.split(":")[1]
self.myFunction()
self.jobs.task_done()
def main(file):
jobs = Queue.Queue()
myList = open(file, "r").readlines()
MAX_THREADS = 10
pool = [Work(jobs) for i in range(MAX_THREADS)]
for thread in pool:
thread.start()
for item in myList:
jobs.put(item)
for thread in pool:
thread.join()
if __name__ == '__main__':
main('list.txt')
脚本在较大的输入上似乎需要更长的时间,因为每批打印之间有 3 秒的暂停。
脚本未完成的问题是,由于您正在使用 Queue
,因此您需要在 Queue
上调用 join()
,而不是在单个线程上。要确保脚本 returns 在作业停止 运行 时,您还应该设置 daemon = True
.
Lock
也不会在当前代码中工作,因为 threading.Lock()
每次都会产生一个新锁。您需要让所有作业共享同一个锁。
如果您想在 Python 3 中使用它(您应该这样做),Queue
模块已重命名为 queue
。
import threading, Queue, time
lock = threading.Lock() # One lock
class Work(threading.Thread):
def __init__(self, jobs):
threading.Thread.__init__(self)
self.daemon = True # set daemon
self.jobs = jobs
def myFunction(self):
#simulate work
lock.acquire() # All jobs share the one lock
print("Firstname: "+ self.firstname + " Lastname: "+ self.lastname)
self.Lock.release()
time.sleep(3)
def run(self):
while True:
self.item = self.jobs.get().rstrip()
self.firstname = self.item.split(":")[0]
self.lastname = self.item.split(":")[1]
self.myFunction()
self.jobs.task_done()
def main(file):
jobs = Queue.Queue()
with open(file, 'r') as fp: # Close the file when we're done
myList = fp.readlines()
MAX_THREADS = 10
pool = [Work(jobs) for i in range(MAX_THREADS)]
for thread in pool:
thread.start()
for item in myList:
jobs.put(item)
jobs.join() # Join the Queue
if __name__ == '__main__':
main('list.txt')
更简单的示例(基于 Python docs 中的示例)
import threading
import time
from Queue import Queue # Py2
# from queue import Queue # Py3
lock = threading.Lock()
def worker():
while True:
item = jobs.get()
if item is None:
break
firstname, lastname = item.split(':')
lock.acquire()
print("Firstname: " + firstname + " Lastname: " + lastname)
lock.release()
time.sleep(3)
jobs.task_done()
jobs = Queue()
pool = []
MAX_THREADS = 10
for i in range(MAX_THREADS):
thread = threading.Thread(target=worker)
thread.start()
pool.append(thread)
with open('list.txt') as fp:
for line in fp:
jobs.put(line.rstrip())
# block until all tasks are done
jobs.join()
# stop workers
for i in range(MAX_THREADS):
jobs.put(None)
for thread in pool:
thread.join()
我在这里写了这个脚本来从 txt
文件中读取数据并处理它。但似乎如果我给它一个大文件和大量线程,它从列表中读取的越多,脚本就越慢。
有没有办法避免等待所有线程完成并在线程完成工作时启动一个新线程?
另外,当它完成处理时,脚本似乎并没有退出。
import threading, Queue, time
class Work(threading.Thread):
def __init__(self, jobs):
threading.Thread.__init__(self)
self.Lock = threading.Lock()
self.jobs = jobs
def myFunction(self):
#simulate work
self.Lock.acquire()
print("Firstname: "+ self.firstname + " Lastname: "+ self.lastname)
self.Lock.release()
time.sleep(3)
def run(self):
while True:
self.item = self.jobs.get().rstrip()
self.firstname = self.item.split(":")[0]
self.lastname = self.item.split(":")[1]
self.myFunction()
self.jobs.task_done()
def main(file):
jobs = Queue.Queue()
myList = open(file, "r").readlines()
MAX_THREADS = 10
pool = [Work(jobs) for i in range(MAX_THREADS)]
for thread in pool:
thread.start()
for item in myList:
jobs.put(item)
for thread in pool:
thread.join()
if __name__ == '__main__':
main('list.txt')
脚本在较大的输入上似乎需要更长的时间,因为每批打印之间有 3 秒的暂停。
脚本未完成的问题是,由于您正在使用 Queue
,因此您需要在 Queue
上调用 join()
,而不是在单个线程上。要确保脚本 returns 在作业停止 运行 时,您还应该设置 daemon = True
.
Lock
也不会在当前代码中工作,因为 threading.Lock()
每次都会产生一个新锁。您需要让所有作业共享同一个锁。
如果您想在 Python 3 中使用它(您应该这样做),Queue
模块已重命名为 queue
。
import threading, Queue, time
lock = threading.Lock() # One lock
class Work(threading.Thread):
def __init__(self, jobs):
threading.Thread.__init__(self)
self.daemon = True # set daemon
self.jobs = jobs
def myFunction(self):
#simulate work
lock.acquire() # All jobs share the one lock
print("Firstname: "+ self.firstname + " Lastname: "+ self.lastname)
self.Lock.release()
time.sleep(3)
def run(self):
while True:
self.item = self.jobs.get().rstrip()
self.firstname = self.item.split(":")[0]
self.lastname = self.item.split(":")[1]
self.myFunction()
self.jobs.task_done()
def main(file):
jobs = Queue.Queue()
with open(file, 'r') as fp: # Close the file when we're done
myList = fp.readlines()
MAX_THREADS = 10
pool = [Work(jobs) for i in range(MAX_THREADS)]
for thread in pool:
thread.start()
for item in myList:
jobs.put(item)
jobs.join() # Join the Queue
if __name__ == '__main__':
main('list.txt')
更简单的示例(基于 Python docs 中的示例)
import threading
import time
from Queue import Queue # Py2
# from queue import Queue # Py3
lock = threading.Lock()
def worker():
while True:
item = jobs.get()
if item is None:
break
firstname, lastname = item.split(':')
lock.acquire()
print("Firstname: " + firstname + " Lastname: " + lastname)
lock.release()
time.sleep(3)
jobs.task_done()
jobs = Queue()
pool = []
MAX_THREADS = 10
for i in range(MAX_THREADS):
thread = threading.Thread(target=worker)
thread.start()
pool.append(thread)
with open('list.txt') as fp:
for line in fp:
jobs.put(line.rstrip())
# block until all tasks are done
jobs.join()
# stop workers
for i in range(MAX_THREADS):
jobs.put(None)
for thread in pool:
thread.join()