Python 线程和代码的一般问题
Issues with Python Threading and Code in general
给定以下 Python3 代码,带线程:
class main:
def __init__(self):
self.text = open(os.getcwd()+"/FileScanLogs.txt", "a+")
self.hashlist = queue.Queue()
self.filelist = queue.Queue()
self.top = '/home/'
for y in range(12):
self.u = threading.Thread(target=self.md5hash)
self.u.daemon = True
self.u.start()
for x in range(4):
self.t = threading.Thread(target=self.threader)
self.t.daemon = True
self.t.start()
main.body(self)
def body(self):
start = time.time()
self.text.write("Time now is " + time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + "\n")
for root, dirs, files in os.walk(self.top):
for f in files:
path = os.path.join(root,f)
self.filelist.put(path)
self.t.join()
self.u.join()
self.text.write("Total time taken : " + str(time.time() - start) + "\n")
print("Log file is created as " + os.getcwd() + "/FileScanLogs.txt")
def md5hash(self):
while True:
entry = self.filelist.get()
//hashing//
lists = finalhash + ',' + entry
self.hashlist.put(lists)
self.filelist.task_done()
def compare(self, hashed, path):
f = open(os.getcwd() + "/database.csv", 'r')
for row in f:
if row.split(':')[1] == hashed:
print("Suspicious File!")
print("Suspecfs: " + row.split(':')[2] + "File name : " + path)
def threader(self):
while True:
item = self.hashlist.get()
hashes = item.split(',')[0]
path = item.split(',')[1]
self.compare(hashes, path)
self.hashlist.task_done()
main()
问题1:def body(self)
中存在self.text.write("Time now is ...")
行。此行不会出现在创建的日志文件中。
问题 2:在 def compare(self, hashed, path)
中,有一行在每次哈希冲突时打印 "Suspicious File!" 和 file path
。由于 4 个线程 t
正在争夺谁先打印,所以此行总是打印乱序。为此,我想我需要知道如何让 Python 线程 运行 print
按顺序执行命令,而不是按他们喜欢的方式执行命令 - 如何?
问题3:在def body(self)
中,存在行self.u.join()
和self.t.join()
。据我所知,命令 join()
是等待线程终止后再继续的命令。线程都没有终止。
附加信息1:我正在编写多线程,因为我需要稍后将代码转换为多处理。
附加信息 2:如果您浏览时我对我的代码有任何误解 commands/syntax,请告诉我。
问题 1:您正在写入文件缓冲区 - 只有当缓冲区已满、文件句柄已关闭或您显式调用时,它才会刷新到实际文件flush()
就可以了(即 self.text.flush()
)
问题 2:您要么希望您的代码并行执行(实际上不是,但我们会做到这一点)但您失去了执行顺序,或者您连续执行以保持顺序。如果你想 运行 多个线程,让它们一个接一个地执行是没有意义的,因为这样你就不会并行执行你的代码,你还不如在主线程中执行所有内容。
如果你只想控制输出到STDOUT,只要它不干扰线程执行,你可以捕获你想要打印的内容,并在最后打印出来一个互斥体(所以只有一个thread writes at the time) 或者甚至通过管道将其返回到主线程并让它管理对 STDOUT 的访问。一个简单的互斥量示例是:
PRINT_MUTEX = threading.Lock()
def compare(self, hashed, path): # never mind the inefficiency, we'll get to that later
out = [] # hold our output buffer
with open(os.getcwd() + "/database.csv", 'r') as f:
for row in f:
row = row.split(':')
if row[1] == hashed:
out.append("Suspicious File!")
out.append("Suspecfs: " + row[2] + "File name : " + path)
if out:
with self.PRINT_MUTEX: # use a lock to print out the results
print("\n".join(out))
这不会保持线程执行的顺序(你也不应该试图破坏 'parallel' 执行的目的)但至少线程会输出它们的 compare
结果一次而不是散布他们的结果。如果你想让你的主 thread/process 控制 STDOUT,特别是因为你想把它转换成多处理代码,检查 .
问题 3:您的线程永远不会退出,因为它们卡在 while True
循环中 - 直到您脱离它,线程将保持 运行宁。我不知道你构建代码的方式背后的原因是什么,但如果你试图并行化文件列表(主线程)、读取、散列(md5hash 线程)和比较(threader 线程),你可能想要当没有更多文件时停止散列,当没有更多散列时停止比较。为此,您不能真正使用 Queue.task_done()
,因为它会向其他 'listeners' 发出信号(如果它们被 Queue.join()
呼叫阻止,而您的呼叫不是),您已完成队列修改.
你应该为此使用一个 threading.Event
信号,但如果你想保留它 queue.Queue
只有你可以创建一个特殊的 属性 来表示你的队列的末尾,然后当没有什么要处理时将它放在队列中,然后让你的线程在遇到这个特殊的 属性 时退出它们的循环。让我们首先修复代码中的一个大疏忽 - 您根本没有存储对线程的引用,而是用最后一个线程覆盖它,因此您无法真正控制执行流程 - 而不是将最后一个线程引用存储在变量,将所有引用存储在列表中。此外,如果您要等待所有内容关闭,请不要使用守护线程:
def __init__(self):
self.text = open(os.getcwd()+"/FileScanLogs.txt", "a+") # consider os.path.join()
self.hashlist = queue.Queue()
self.filelist = queue.Queue()
self.hashers = [] # hold the md5hash thread references
self.comparators = [] # hold the threader thread references
self.top = '/home/'
for _ in range(12): # you might want to consider a ThreadPool instead
t = threading.Thread(target=self.md5hash)
t.start()
self.hashers.append(t)
for _ in range(4):
t = threading.Thread(target=self.threader)
t.start()
self.comparators.append(t)
main.body(self)
现在我们可以修改 main.body()
方法,以便将上述特殊值添加到队列的末尾,以便工作线程知道何时停止:
QUEUE_CLOSE = object() # a 'special' object to denote end-of-data in our queues
def body(self):
start = time.time()
self.text.write("Time: " + time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + "\n")
for root, dirs, files in os.walk(self.top):
for f in files:
path = os.path.join(root, f)
self.filelist.put(path)
self.filelist.put(self.QUEUE_CLOSE) # no more files, signal the end of the filelist
for t in self.hashers: # let's first wait for our hashing threads to exit
t.join()
# since we're not going to be receiving any new hashes, we can...
self.hashlist.put(self.QUEUE_CLOSE) # ... signal the end of the hashlist as well
for t in self.comparators: # let's wait for our comparator threads to exit
t.join()
self.text.write("Total: " + str(time.time() - start) + "\n")
self.text.close() # close the log file (this will also flush the previous content)
print("Log file is created as " + os.getcwd() + "/FileScanLogs.txt")
因此我们需要修改工作线程,使其在遇到队列末尾时退出:
def md5hash(self):
while self.filelist:
entry = self.filelist.get()
if entry is self.QUEUE_CLOSE: # end of queue encountered
self.filelist.put(self.QUEUE_CLOSE) # put it back for the other threads
break # break away the processing
finalhash = whatever_is_your_hash_code(entry)
lists = finalhash + ',' + entry
self.hashlist.put(lists)
def threader(self):
while True:
item = self.hashlist.get()
if item is self.QUEUE_CLOSE: # end of queue encountered
self.hashlist.put(self.QUEUE_CLOSE) # put it back for the other threads
break # break away the queue
hashes = item.split(',')[0]
path = item.split(',')[1]
self.compare(hashes, path)
现在,如果你 运行 它,只要你未列出的散列部分正常工作,一切最终都会退出。
除了笨拙的设置之外,您绝对应该做的一件事是优化掉 main.compare()
方法 - 因为 CSV 文件在执行期间不会改变(如果发生变化,您应该在 -内存)加载整个 CSV 并循环遍历它以获取要比较的每个文件的哈希值是荒谬的。将整个 CSV 加载为 hash<=>whatever
dict
,然后当场进行比较(即 if hashed in your_map
)。
最后,正如我上面提到的,是时候,嗯,给你的游行下雨了——所有这一切都是徒劳的!由于可怕的 GIL none 你的线程在这里并行执行(实际上,只有文件加载在一定程度上起作用,但任何优势都可能因散列数据所需的时间而受阻)。他们 运行 作为一个独立的、老实说的系统线程,但是 GIL 确保这些线程中只有一个线程 运行 一次 运行 所以这段代码,处理明智,很可能比如果您 运行 将所有内容都集中在一个线程中。这在多处理过程中对你没有太大帮助,因为你不能共享本地实例状态(好吧,你可以,检查 ,但它只是一个主要的 PITA,大多数时候不值得经历麻烦)。
给定以下 Python3 代码,带线程:
class main:
def __init__(self):
self.text = open(os.getcwd()+"/FileScanLogs.txt", "a+")
self.hashlist = queue.Queue()
self.filelist = queue.Queue()
self.top = '/home/'
for y in range(12):
self.u = threading.Thread(target=self.md5hash)
self.u.daemon = True
self.u.start()
for x in range(4):
self.t = threading.Thread(target=self.threader)
self.t.daemon = True
self.t.start()
main.body(self)
def body(self):
start = time.time()
self.text.write("Time now is " + time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + "\n")
for root, dirs, files in os.walk(self.top):
for f in files:
path = os.path.join(root,f)
self.filelist.put(path)
self.t.join()
self.u.join()
self.text.write("Total time taken : " + str(time.time() - start) + "\n")
print("Log file is created as " + os.getcwd() + "/FileScanLogs.txt")
def md5hash(self):
while True:
entry = self.filelist.get()
//hashing//
lists = finalhash + ',' + entry
self.hashlist.put(lists)
self.filelist.task_done()
def compare(self, hashed, path):
f = open(os.getcwd() + "/database.csv", 'r')
for row in f:
if row.split(':')[1] == hashed:
print("Suspicious File!")
print("Suspecfs: " + row.split(':')[2] + "File name : " + path)
def threader(self):
while True:
item = self.hashlist.get()
hashes = item.split(',')[0]
path = item.split(',')[1]
self.compare(hashes, path)
self.hashlist.task_done()
main()
问题1:def body(self)
中存在self.text.write("Time now is ...")
行。此行不会出现在创建的日志文件中。
问题 2:在 def compare(self, hashed, path)
中,有一行在每次哈希冲突时打印 "Suspicious File!" 和 file path
。由于 4 个线程 t
正在争夺谁先打印,所以此行总是打印乱序。为此,我想我需要知道如何让 Python 线程 运行 print
按顺序执行命令,而不是按他们喜欢的方式执行命令 - 如何?
问题3:在def body(self)
中,存在行self.u.join()
和self.t.join()
。据我所知,命令 join()
是等待线程终止后再继续的命令。线程都没有终止。
附加信息1:我正在编写多线程,因为我需要稍后将代码转换为多处理。
附加信息 2:如果您浏览时我对我的代码有任何误解 commands/syntax,请告诉我。
问题 1:您正在写入文件缓冲区 - 只有当缓冲区已满、文件句柄已关闭或您显式调用时,它才会刷新到实际文件flush()
就可以了(即 self.text.flush()
)
问题 2:您要么希望您的代码并行执行(实际上不是,但我们会做到这一点)但您失去了执行顺序,或者您连续执行以保持顺序。如果你想 运行 多个线程,让它们一个接一个地执行是没有意义的,因为这样你就不会并行执行你的代码,你还不如在主线程中执行所有内容。
如果你只想控制输出到STDOUT,只要它不干扰线程执行,你可以捕获你想要打印的内容,并在最后打印出来一个互斥体(所以只有一个thread writes at the time) 或者甚至通过管道将其返回到主线程并让它管理对 STDOUT 的访问。一个简单的互斥量示例是:
PRINT_MUTEX = threading.Lock()
def compare(self, hashed, path): # never mind the inefficiency, we'll get to that later
out = [] # hold our output buffer
with open(os.getcwd() + "/database.csv", 'r') as f:
for row in f:
row = row.split(':')
if row[1] == hashed:
out.append("Suspicious File!")
out.append("Suspecfs: " + row[2] + "File name : " + path)
if out:
with self.PRINT_MUTEX: # use a lock to print out the results
print("\n".join(out))
这不会保持线程执行的顺序(你也不应该试图破坏 'parallel' 执行的目的)但至少线程会输出它们的 compare
结果一次而不是散布他们的结果。如果你想让你的主 thread/process 控制 STDOUT,特别是因为你想把它转换成多处理代码,检查
问题 3:您的线程永远不会退出,因为它们卡在 while True
循环中 - 直到您脱离它,线程将保持 运行宁。我不知道你构建代码的方式背后的原因是什么,但如果你试图并行化文件列表(主线程)、读取、散列(md5hash 线程)和比较(threader 线程),你可能想要当没有更多文件时停止散列,当没有更多散列时停止比较。为此,您不能真正使用 Queue.task_done()
,因为它会向其他 'listeners' 发出信号(如果它们被 Queue.join()
呼叫阻止,而您的呼叫不是),您已完成队列修改.
你应该为此使用一个 threading.Event
信号,但如果你想保留它 queue.Queue
只有你可以创建一个特殊的 属性 来表示你的队列的末尾,然后当没有什么要处理时将它放在队列中,然后让你的线程在遇到这个特殊的 属性 时退出它们的循环。让我们首先修复代码中的一个大疏忽 - 您根本没有存储对线程的引用,而是用最后一个线程覆盖它,因此您无法真正控制执行流程 - 而不是将最后一个线程引用存储在变量,将所有引用存储在列表中。此外,如果您要等待所有内容关闭,请不要使用守护线程:
def __init__(self):
self.text = open(os.getcwd()+"/FileScanLogs.txt", "a+") # consider os.path.join()
self.hashlist = queue.Queue()
self.filelist = queue.Queue()
self.hashers = [] # hold the md5hash thread references
self.comparators = [] # hold the threader thread references
self.top = '/home/'
for _ in range(12): # you might want to consider a ThreadPool instead
t = threading.Thread(target=self.md5hash)
t.start()
self.hashers.append(t)
for _ in range(4):
t = threading.Thread(target=self.threader)
t.start()
self.comparators.append(t)
main.body(self)
现在我们可以修改 main.body()
方法,以便将上述特殊值添加到队列的末尾,以便工作线程知道何时停止:
QUEUE_CLOSE = object() # a 'special' object to denote end-of-data in our queues
def body(self):
start = time.time()
self.text.write("Time: " + time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + "\n")
for root, dirs, files in os.walk(self.top):
for f in files:
path = os.path.join(root, f)
self.filelist.put(path)
self.filelist.put(self.QUEUE_CLOSE) # no more files, signal the end of the filelist
for t in self.hashers: # let's first wait for our hashing threads to exit
t.join()
# since we're not going to be receiving any new hashes, we can...
self.hashlist.put(self.QUEUE_CLOSE) # ... signal the end of the hashlist as well
for t in self.comparators: # let's wait for our comparator threads to exit
t.join()
self.text.write("Total: " + str(time.time() - start) + "\n")
self.text.close() # close the log file (this will also flush the previous content)
print("Log file is created as " + os.getcwd() + "/FileScanLogs.txt")
因此我们需要修改工作线程,使其在遇到队列末尾时退出:
def md5hash(self):
while self.filelist:
entry = self.filelist.get()
if entry is self.QUEUE_CLOSE: # end of queue encountered
self.filelist.put(self.QUEUE_CLOSE) # put it back for the other threads
break # break away the processing
finalhash = whatever_is_your_hash_code(entry)
lists = finalhash + ',' + entry
self.hashlist.put(lists)
def threader(self):
while True:
item = self.hashlist.get()
if item is self.QUEUE_CLOSE: # end of queue encountered
self.hashlist.put(self.QUEUE_CLOSE) # put it back for the other threads
break # break away the queue
hashes = item.split(',')[0]
path = item.split(',')[1]
self.compare(hashes, path)
现在,如果你 运行 它,只要你未列出的散列部分正常工作,一切最终都会退出。
除了笨拙的设置之外,您绝对应该做的一件事是优化掉 main.compare()
方法 - 因为 CSV 文件在执行期间不会改变(如果发生变化,您应该在 -内存)加载整个 CSV 并循环遍历它以获取要比较的每个文件的哈希值是荒谬的。将整个 CSV 加载为 hash<=>whatever
dict
,然后当场进行比较(即 if hashed in your_map
)。
最后,正如我上面提到的,是时候,嗯,给你的游行下雨了——所有这一切都是徒劳的!由于可怕的 GIL none 你的线程在这里并行执行(实际上,只有文件加载在一定程度上起作用,但任何优势都可能因散列数据所需的时间而受阻)。他们 运行 作为一个独立的、老实说的系统线程,但是 GIL 确保这些线程中只有一个线程 运行 一次 运行 所以这段代码,处理明智,很可能比如果您 运行 将所有内容都集中在一个线程中。这在多处理过程中对你没有太大帮助,因为你不能共享本地实例状态(好吧,你可以,检查