避免等待线程在 Python 内完成

Avoid waiting for threads to finish in Python

我在这里写了这个脚本来从 txt 文件中读取数据并处理它。但似乎如果我给它一个大文件和大量线程,它从列表中读取的越多,脚本就越慢。

有没有办法避免等待所有线程完成并在线程完成工作时启动一个新线程?

另外,当它完成处理时,脚本似乎并没有退出。

import threading, Queue, time

class Work(threading.Thread):

    def __init__(self, jobs):
        threading.Thread.__init__(self)
        self.Lock = threading.Lock()
        self.jobs = jobs

    def myFunction(self):
        #simulate work
        self.Lock.acquire()
        print("Firstname: "+ self.firstname + " Lastname: "+ self.lastname)
        self.Lock.release()
        time.sleep(3)

    def run(self):
        while True:
            self.item = self.jobs.get().rstrip()
            self.firstname = self.item.split(":")[0]
            self.lastname = self.item.split(":")[1]
            self.myFunction()
            self.jobs.task_done()

def main(file):
    jobs = Queue.Queue()
    myList = open(file, "r").readlines()
    MAX_THREADS = 10
    pool = [Work(jobs) for i in range(MAX_THREADS)]
    for thread in pool:
        thread.start()
    for item in myList:
        jobs.put(item)
    for thread in pool:
        thread.join()

if __name__ == '__main__':
    main('list.txt')

脚本在较大的输入上似乎需要更长的时间,因为每批打印之间有 3 秒的暂停。

脚本未完成的问题是,由于您正在使用 Queue,因此您需要在 Queue 上调用 join(),而不是在单个线程上。要确保脚本 returns 在作业停止 运行 时,您还应该设置 daemon = True.

Lock 也不会在当前代码中工作,因为 threading.Lock() 每次都会产生一个新锁。您需要让所有作业共享同一个锁。

如果您想在 Python 3 中使用它(您应该这样做),Queue 模块已重命名为 queue

import threading, Queue, time

lock = threading.Lock()  # One lock

class Work(threading.Thread):

    def __init__(self, jobs):
        threading.Thread.__init__(self)
        self.daemon = True  # set daemon
        self.jobs = jobs

    def myFunction(self):
        #simulate work
        lock.acquire()  # All jobs share the one lock
        print("Firstname: "+ self.firstname + " Lastname: "+ self.lastname)
        self.Lock.release()
        time.sleep(3)

    def run(self):
        while True:
            self.item = self.jobs.get().rstrip()
            self.firstname = self.item.split(":")[0]
            self.lastname = self.item.split(":")[1]
            self.myFunction()
            self.jobs.task_done()


def main(file):
    jobs = Queue.Queue()
    with open(file, 'r') as fp:  # Close the file when we're done
        myList = fp.readlines()
    MAX_THREADS = 10
    pool = [Work(jobs) for i in range(MAX_THREADS)]
    for thread in pool:
        thread.start()
    for item in myList:
        jobs.put(item)
    jobs.join()    # Join the Queue


if __name__ == '__main__':
    main('list.txt')

更简单的示例(基于 Python docs 中的示例)

import threading
import time
from Queue import Queue # Py2
# from queue import Queue # Py3

lock = threading.Lock()

def worker():
    while True:
        item = jobs.get()
        if item is None:
            break
        firstname, lastname = item.split(':')
        lock.acquire()
        print("Firstname: " + firstname + " Lastname: " + lastname)
        lock.release()
        time.sleep(3)
        jobs.task_done()

jobs = Queue()
pool = []
MAX_THREADS = 10
for i in range(MAX_THREADS):
    thread = threading.Thread(target=worker)
    thread.start()
    pool.append(thread)

with open('list.txt') as fp:
    for line in fp:
        jobs.put(line.rstrip())

# block until all tasks are done
jobs.join()

# stop workers
for i in range(MAX_THREADS):
    jobs.put(None)
for thread in pool:
    thread.join()