Python 线程和代码的一般问题

Issues with Python Threading and Code in general

给定以下 Python3 代码,带线程:

class main:
    def __init__(self):
        self.text = open(os.getcwd()+"/FileScanLogs.txt", "a+")
        self.hashlist = queue.Queue()
        self.filelist = queue.Queue()
        self.top = '/home/'
        for y in range(12):
            self.u = threading.Thread(target=self.md5hash)
            self.u.daemon = True
            self.u.start()
        for x in range(4):
            self.t = threading.Thread(target=self.threader)
            self.t.daemon = True
            self.t.start()
        main.body(self)

    def body(self):
        start = time.time()
        self.text.write("Time now is " + time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + "\n")
        for root, dirs, files in os.walk(self.top):
            for f in files:
                path = os.path.join(root,f)
                self.filelist.put(path)
        self.t.join()
        self.u.join()
        self.text.write("Total time taken     : " + str(time.time() - start) + "\n")
        print("Log file is created as " + os.getcwd() + "/FileScanLogs.txt")

    def md5hash(self):
        while True:
            entry = self.filelist.get()
            //hashing//
            lists = finalhash + ',' + entry
            self.hashlist.put(lists)
            self.filelist.task_done()

    def compare(self, hashed, path):
        f = open(os.getcwd() + "/database.csv", 'r')
        for row in f:
            if row.split(':')[1] == hashed:
                print("Suspicious File!")
                print("Suspecfs: " + row.split(':')[2] + "File name : " + path)

    def threader(self):
        while True:
            item = self.hashlist.get()
            hashes = item.split(',')[0]
            path = item.split(',')[1]
            self.compare(hashes, path)
            self.hashlist.task_done()

main()

问题1:def body(self)中存在self.text.write("Time now is ...")行。此行不会出现在创建的日志文件中。

问题 2:在 def compare(self, hashed, path) 中,有一行在每次哈希冲突时打印 "Suspicious File!" 和 file path。由于 4 个线程 t 正在争夺谁先打印,所以此行总是打印乱序。为此,我想我需要知道如何让 Python 线程 运行 print 按顺序执行命令,而不是按他们喜欢的方式执行命令 - 如何?

问题3:在def body(self)中,存在行self.u.join()self.t.join()。据我所知,命令 join() 是等待线程终止后再继续的命令。线程都没有终止。

附加信息1:我正在编写多线程,因为我需要稍后将代码转换为多处理。

附加信息 2:如果您浏览时我对我的代码有任何误解 commands/syntax,请告诉我。

问题 1:您正在写入文件缓冲区 - 只有当缓冲区已满、文件句柄已关闭或您显式调用时,它才会刷新到实际文件flush() 就可以了(即 self.text.flush()

问题 2:您要么希望您的代码并行执行(实际上不是,但我们会做到这一点)但您失去了执行顺序,或者您连续执行以保持顺序。如果你想 运行 多个线程,让它们一个接一个地执行是没有意义的,因为这样你就不会并行执行你的代码,你还不如在主线程中执行所有内容。

如果你只想控制输出到STDOUT,只要它不干扰线程执行,你可以捕获你想要打印的内容,并在最后打印出来一个互斥体(所以只有一个thread writes at the time) 或者甚至通过管道将其返回到主线程并让它管理对 STDOUT 的访问。一个简单的互斥量示例是:

PRINT_MUTEX = threading.Lock()

def compare(self, hashed, path):  # never mind the inefficiency, we'll get to that later
    out = []  # hold our output buffer
    with open(os.getcwd() + "/database.csv", 'r') as f:
        for row in f:
            row = row.split(':')
            if row[1] == hashed:
                out.append("Suspicious File!")
                out.append("Suspecfs: " + row[2] + "File name : " + path)
    if out:
        with self.PRINT_MUTEX:  # use a lock to print out the results
            print("\n".join(out))

这不会保持线程执行的顺序(你也不应该试图破坏 'parallel' 执行的目的)但至少线程会输出它们的 compare 结果一次而不是散布他们的结果。如果你想让你的主 thread/process 控制 STDOUT,特别是因为你想把它转换成多处理代码,检查 .

问题 3:您的线程永远不会退出,因为它们卡在 while True 循环中 - 直到您脱离它,线程将保持 运行宁。我不知道你构建代码的方式背后的原因是什么,但如果你试图并行化文件列表(主线程)、读取、散列(md5hash 线程)和比较(threader 线程),你可能想要当没有更多文件时停止散列,当没有更多散列时停止比较。为此,您不能真正使用 Queue.task_done(),因为它会向其他 'listeners' 发出信号(如果它们被 Queue.join() 呼叫阻止,而您的呼叫不是),您已完成队列修改.

你应该为此使用一个 threading.Event 信号,但如果你想保留它 queue.Queue 只有你可以创建一个特殊的 属性 来表示你的队列的末尾,然后当没有什么要处理时将它放在队列中,然后让你的线程在遇到这个特殊的 属性 时退出它们的循环。让我们首先修复代码中的一个大疏忽 - 您根本没有存储对线程的引用,而是用最后一个线程覆盖它,因此您无法真正控制执行流程 - 而不是将最后一个线程引用存储在变量,将所有引用存储在列表中。此外,如果您要等待所有内容关闭,请不要使用守护线程:

def __init__(self):
    self.text = open(os.getcwd()+"/FileScanLogs.txt", "a+")  # consider os.path.join()
    self.hashlist = queue.Queue()
    self.filelist = queue.Queue()
    self.hashers = []  # hold the md5hash thread references
    self.comparators = []  # hold the threader thread references
    self.top = '/home/'
    for _ in range(12):  # you might want to consider a ThreadPool instead
        t = threading.Thread(target=self.md5hash)
        t.start()
        self.hashers.append(t)
    for _ in range(4):
        t = threading.Thread(target=self.threader)
        t.start()
        self.comparators.append(t)
    main.body(self)

现在我们可以修改 main.body() 方法,以便将上述特殊值添加到队列的末尾,以便工作线程知道何时停止:

QUEUE_CLOSE = object()  # a 'special' object to denote end-of-data in our queues

def body(self):
    start = time.time()
    self.text.write("Time:  " + time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + "\n")
    for root, dirs, files in os.walk(self.top):
        for f in files:
            path = os.path.join(root, f)
            self.filelist.put(path)
    self.filelist.put(self.QUEUE_CLOSE)  # no more files, signal the end of the filelist
    for t in self.hashers:  # let's first wait for our hashing threads to exit
        t.join()
    # since we're not going to be receiving any new hashes, we can...
    self.hashlist.put(self.QUEUE_CLOSE)  # ... signal the end of the hashlist as well
    for t in self.comparators:  # let's wait for our comparator threads to exit
        t.join()
    self.text.write("Total: " + str(time.time() - start) + "\n")
    self.text.close()  # close the log file (this will also flush the previous content)
    print("Log file is created as " + os.getcwd() + "/FileScanLogs.txt")

因此我们需要修改工作线程,使其在遇到队列末尾时退出:

def md5hash(self):
    while self.filelist:
        entry = self.filelist.get()
        if entry is self.QUEUE_CLOSE:  # end of queue encountered
            self.filelist.put(self.QUEUE_CLOSE)  # put it back for the other threads
            break  # break away the processing
        finalhash = whatever_is_your_hash_code(entry)
        lists = finalhash + ',' + entry
        self.hashlist.put(lists)

def threader(self):
    while True:
        item = self.hashlist.get()
        if item is self.QUEUE_CLOSE:  # end of queue encountered
            self.hashlist.put(self.QUEUE_CLOSE)  # put it back for the other threads
            break  # break away the queue
        hashes = item.split(',')[0]
        path = item.split(',')[1]
        self.compare(hashes, path)

现在,如果你 运行 它,只要你未列出的散列部分正常工作,一切最终都会退出。

除了笨拙的设置之外,您绝对应该做的一件事是优化掉 main.compare() 方法 - 因为 CSV 文件在执行期间不会改变(如果发生变化,您应该在 -内存)加载整个 CSV 并循环遍历它以获取要比较的每个文件的哈希值是荒谬的。将整个 CSV 加载为 hash<=>whatever dict,然后当场进行比较(即 if hashed in your_map)。

最后,正如我上面提到的,是时候,嗯,给你的游行下雨了——所有这一切都是徒劳的!由于可怕的 GIL none 你的线程在这里并行执行(实际上,只有文件加载在一定程度上起作用,但任何优势都可能因散列数据所需的时间而受阻)。他们 运行 作为一个独立的、老实说的系统线程,但是 GIL 确保这些线程中只有一个线程 运行 一次 运行 所以这段代码,处理明智,很可能比如果您 运行 将所有内容都集中在一个线程中。这在多处理过程中对你没有太大帮助,因为你不能共享本地实例状态(好吧,你可以,检查 ,但它只是一个主要的 PITA,大多数时候不值得经历麻烦)。