Python 3 中的多处理
Multiprocessing acting up in Python 3
我正在研究 zip 文件破解程序,并决定使用多处理模块来加快进程。这是一个彻底的痛苦,因为这是我第一次使用这个模块,我什至还没有完全理解它。但是,我让它工作了。
问题是它没有完成单词列表;它只是在单词列表中随机放置,如果找到密码,它会继续遍历单词列表,而不仅仅是停止过程。
有谁知道它为什么表现出这种行为?
ZipFile Cracker 源代码
#!/usr/bin/env python3
import multiprocessing as mp
import zipfile # Handeling the zipfile
import sys # Command line arguments, and quiting application
import time # To calculate runtime
def usage(program_name):
print("Usage: {0} <path to zipfile> <dictionary>".format(program_name))
sys.exit(1)
def cracker(password):
try:
zFile.extractall(pwd=password)
print("[+] Password Found! : {0}".format(password.decode('utf-8')))
pool.close()
except:
pass
def main():
global zFile
global pool
if len(sys.argv) < 3:
usage(sys.argv[0])
zFile = zipfile.ZipFile(sys.argv[1])
print("[*] Started Cracking")
startime = time.time()
pool = mp.Pool()
for i in open(sys.argv[2], 'r', errors='ignore'):
pswd = bytes(i.strip('\n'), 'utf-8')
pool.apply_async(cracker, (pswd,))
print (pswd)
runtime = round(time.time() - startime, 5)
print ("[*] Runtime:", runtime, 'seconds')
sys.exit(0)
if __name__ == "__main__":
main()
您过早终止了您的程序。为了测试这一点,在 cracker
方法中添加一个无害的 time.sleep(10)
并观察您的程序仍然在一秒钟内终止。
调用join
等待池完成:
pool = mp.Pool()
for i in open(sys.argv[2], 'r', errors='ignore'):
pswd = bytes(i.strip('\n'), 'utf-8')
pool.apply_async(cracker, (pswd,))
pool.close() # Indicate that no more data is coming
pool.join() # Wait for pool to finish processing
runtime = round(time.time() - startime, 5)
print ("[*] Runtime:", runtime, 'seconds')
sys.exit(0)
此外,一旦您找到正确的密码,调用 close
仅表示没有更多的未来任务到来 - 所有已提交的任务仍将完成。相反,调用 terminate
终止池而不处理任何更多任务。
此外,根据 multiprocessing.Pool
的实现细节,全局变量 pool
可能在您需要时不可用(并且它的值无论如何都不可序列化)。要解决这个问题,可以使用回调,如
def cracker(password):
try:
zFile.extractall(pwd=password)
except RuntimeError:
return
return password
def callback(found):
if found:
pool.terminate()
...
pool.apply_async(cracker, (pswd,), callback=cb)
当然,既然你现在一直在看结果,apply
并不是正确的做法。相反,您可以使用 imap_unordered
:
编写代码
with open(sys.argv[2], 'r', errors='ignore') as passf, \
multiprocessing.Pool() as pool:
passwords = (line.strip('\n').encode('utf-8') for line in passf)
for found in pool.imap_unordered(cracker, passwords):
if found:
break
除了使用全局变量,您可能还想在每个进程中打开 zip 文件(并创建一个 ZipFile
object),方法是对池使用 initializer
。更好(并且更快),首先放弃所有 I/O 并只读取一次您需要的字节,然后将它们传递给 children.
phihag 的回答是正确的解决方案。
我只是想提供有关在找到正确密码后调用 terminate()
的更多详细信息。当我 运行 代码时, cracker()
中的 pool
变量未定义。所以试图从那里调用它只会抛出一个异常:
NameError: name 'pool' is not defined
(我的fork()
经验薄弱,完全不明白为什么全局zFile
复制到子进程成功,而pool
没有。即使被复制了,它在父进程中不会是相同的 pool
,对吧?所以在它上面调用的任何方法都不会影响父进程中的 real 池。无论如何,我更喜欢 multiprocessing
模块的 编程指南 部分中列出的 this 建议: 将资源显式传递给子进程 。 )
我的建议是cracker()
return密码正确的话,否则returnNone
。然后将回调传递给 apply_async()
以记录正确的密码,并终止池。这是我修改您的代码以执行此操作的看法:
#!/usr/bin/env python3
import multiprocessing as mp
import zipfile # Handeling the zipfile
import sys # Command line arguments, and quiting application
import time # To calculate runtime
import os
def usage(program_name):
print("Usage: {0} <path to zipfile> <dictionary>".format(program_name))
sys.exit(1)
def cracker(zip_file_path, password):
print('[*] Starting new cracker (pid={0}, password="{1}")'.format(os.getpid(), password))
try:
time.sleep(1) # XXX: to simulate the task taking a bit of time
with zipfile.ZipFile(zip_file_path) as zFile:
zFile.extractall(pwd=bytes(password, 'utf-8'))
return password
except:
return None
def main():
if len(sys.argv) < 3:
usage(sys.argv[0])
print('[*] Starting main (pid={0})'.format(os.getpid()))
zip_file_path = sys.argv[1]
password_file_path = sys.argv[2]
startime = time.time()
actual_password = None
with mp.Pool() as pool:
def set_actual_password(password):
nonlocal actual_password
if password:
print('[*] Found password; stopping future tasks')
pool.terminate()
actual_password = password
with open(password_file_path, 'r', errors='ignore') as password_file:
for pswd in password_file:
pswd = pswd.strip('\n')
pool.apply_async(cracker, (zip_file_path, pswd,), callback=set_actual_password)
pool.close()
pool.join()
if actual_password:
print('[*] Cracked password: "{0}"'.format(actual_password))
else:
print('[*] Unable to crack password')
runtime = round(time.time() - startime, 5)
print("[*] Runtime:", runtime, 'seconds')
sys.exit(0)
if __name__ == "__main__":
main()
下面是 and 建议的实现:
#!/usr/bin/env python3
"""Brute force zip password.
Usage: brute-force-zip-password <zip archive> <passwords>
"""
import sys
from multiprocessing import Pool
from time import monotonic as timer
from zipfile import ZipFile
def init(archive): # run at the start of a worker process
global zfile
zfile = ZipFile(open(archive, 'rb')) # open file in each process once
def check(password):
assert password
try:
with zfile.open(zfile.infolist()[0], pwd=password):
return password # assume success
except Exception as e:
if e.args[0] != 'Bad password for file':
# assume all other errors happen after the password was accepted
raise RuntimeError(password) from e
def main():
if len(sys.argv) != 3:
sys.exit(__doc__) # print usage
start = timer()
# decode passwords using the preferred locale encoding
with open(sys.argv[2], errors='ignore') as file, \
Pool(initializer=init, initargs=[sys.argv[1]]) as pool: # use all CPUs
# check passwords encoded using utf-8
passwords = (line.rstrip('\n').encode('utf-8') for line in file)
passwords = filter(None, passwords) # filter empty passwords
for password in pool.imap_unordered(check, passwords, chunksize=100):
if password is not None: # found
print("Password: '{}'".format(password.decode('utf-8')))
break
else:
sys.exit('Unable to find password')
print('Runtime: %.5f seconds' % (timer() - start,))
if __name__=="__main__":
main()
注:
- 每个工作进程都有自己的
ZipFile
对象,每个进程打开一次 zip 文件:它应该使它更便携(Windows 支持)并提高时间性能
- 内容未提取:
check(password)
尝试打开并在成功时立即关闭存档成员:它更安全并且应该提高时间性能(无需创建目录等)
- 解密存档成员时除
'Bad password for file'
之外的所有错误都假定在密码被接受后发生:合理的是避免沉默意外错误——每个异常应单独考虑
check(password)
需要非空密码
chunksize
参数可能会显着提高性能
- 使用了一种罕见的
for
/else
语法,用于报告找不到密码的情况
with
-语句为您调用 pool.terminate()
我正在研究 zip 文件破解程序,并决定使用多处理模块来加快进程。这是一个彻底的痛苦,因为这是我第一次使用这个模块,我什至还没有完全理解它。但是,我让它工作了。
问题是它没有完成单词列表;它只是在单词列表中随机放置,如果找到密码,它会继续遍历单词列表,而不仅仅是停止过程。
有谁知道它为什么表现出这种行为?
ZipFile Cracker 源代码
#!/usr/bin/env python3
import multiprocessing as mp
import zipfile # Handeling the zipfile
import sys # Command line arguments, and quiting application
import time # To calculate runtime
def usage(program_name):
print("Usage: {0} <path to zipfile> <dictionary>".format(program_name))
sys.exit(1)
def cracker(password):
try:
zFile.extractall(pwd=password)
print("[+] Password Found! : {0}".format(password.decode('utf-8')))
pool.close()
except:
pass
def main():
global zFile
global pool
if len(sys.argv) < 3:
usage(sys.argv[0])
zFile = zipfile.ZipFile(sys.argv[1])
print("[*] Started Cracking")
startime = time.time()
pool = mp.Pool()
for i in open(sys.argv[2], 'r', errors='ignore'):
pswd = bytes(i.strip('\n'), 'utf-8')
pool.apply_async(cracker, (pswd,))
print (pswd)
runtime = round(time.time() - startime, 5)
print ("[*] Runtime:", runtime, 'seconds')
sys.exit(0)
if __name__ == "__main__":
main()
您过早终止了您的程序。为了测试这一点,在 cracker
方法中添加一个无害的 time.sleep(10)
并观察您的程序仍然在一秒钟内终止。
调用join
等待池完成:
pool = mp.Pool()
for i in open(sys.argv[2], 'r', errors='ignore'):
pswd = bytes(i.strip('\n'), 'utf-8')
pool.apply_async(cracker, (pswd,))
pool.close() # Indicate that no more data is coming
pool.join() # Wait for pool to finish processing
runtime = round(time.time() - startime, 5)
print ("[*] Runtime:", runtime, 'seconds')
sys.exit(0)
此外,一旦您找到正确的密码,调用 close
仅表示没有更多的未来任务到来 - 所有已提交的任务仍将完成。相反,调用 terminate
终止池而不处理任何更多任务。
此外,根据 multiprocessing.Pool
的实现细节,全局变量 pool
可能在您需要时不可用(并且它的值无论如何都不可序列化)。要解决这个问题,可以使用回调,如
def cracker(password):
try:
zFile.extractall(pwd=password)
except RuntimeError:
return
return password
def callback(found):
if found:
pool.terminate()
...
pool.apply_async(cracker, (pswd,), callback=cb)
当然,既然你现在一直在看结果,apply
并不是正确的做法。相反,您可以使用 imap_unordered
:
with open(sys.argv[2], 'r', errors='ignore') as passf, \
multiprocessing.Pool() as pool:
passwords = (line.strip('\n').encode('utf-8') for line in passf)
for found in pool.imap_unordered(cracker, passwords):
if found:
break
除了使用全局变量,您可能还想在每个进程中打开 zip 文件(并创建一个 ZipFile
object),方法是对池使用 initializer
。更好(并且更快),首先放弃所有 I/O 并只读取一次您需要的字节,然后将它们传递给 children.
phihag 的回答是正确的解决方案。
我只是想提供有关在找到正确密码后调用 terminate()
的更多详细信息。当我 运行 代码时, cracker()
中的 pool
变量未定义。所以试图从那里调用它只会抛出一个异常:
NameError: name 'pool' is not defined
(我的fork()
经验薄弱,完全不明白为什么全局zFile
复制到子进程成功,而pool
没有。即使被复制了,它在父进程中不会是相同的 pool
,对吧?所以在它上面调用的任何方法都不会影响父进程中的 real 池。无论如何,我更喜欢 multiprocessing
模块的 编程指南 部分中列出的 this 建议: 将资源显式传递给子进程 。 )
我的建议是cracker()
return密码正确的话,否则returnNone
。然后将回调传递给 apply_async()
以记录正确的密码,并终止池。这是我修改您的代码以执行此操作的看法:
#!/usr/bin/env python3
import multiprocessing as mp
import zipfile # Handeling the zipfile
import sys # Command line arguments, and quiting application
import time # To calculate runtime
import os
def usage(program_name):
print("Usage: {0} <path to zipfile> <dictionary>".format(program_name))
sys.exit(1)
def cracker(zip_file_path, password):
print('[*] Starting new cracker (pid={0}, password="{1}")'.format(os.getpid(), password))
try:
time.sleep(1) # XXX: to simulate the task taking a bit of time
with zipfile.ZipFile(zip_file_path) as zFile:
zFile.extractall(pwd=bytes(password, 'utf-8'))
return password
except:
return None
def main():
if len(sys.argv) < 3:
usage(sys.argv[0])
print('[*] Starting main (pid={0})'.format(os.getpid()))
zip_file_path = sys.argv[1]
password_file_path = sys.argv[2]
startime = time.time()
actual_password = None
with mp.Pool() as pool:
def set_actual_password(password):
nonlocal actual_password
if password:
print('[*] Found password; stopping future tasks')
pool.terminate()
actual_password = password
with open(password_file_path, 'r', errors='ignore') as password_file:
for pswd in password_file:
pswd = pswd.strip('\n')
pool.apply_async(cracker, (zip_file_path, pswd,), callback=set_actual_password)
pool.close()
pool.join()
if actual_password:
print('[*] Cracked password: "{0}"'.format(actual_password))
else:
print('[*] Unable to crack password')
runtime = round(time.time() - startime, 5)
print("[*] Runtime:", runtime, 'seconds')
sys.exit(0)
if __name__ == "__main__":
main()
下面是
#!/usr/bin/env python3
"""Brute force zip password.
Usage: brute-force-zip-password <zip archive> <passwords>
"""
import sys
from multiprocessing import Pool
from time import monotonic as timer
from zipfile import ZipFile
def init(archive): # run at the start of a worker process
global zfile
zfile = ZipFile(open(archive, 'rb')) # open file in each process once
def check(password):
assert password
try:
with zfile.open(zfile.infolist()[0], pwd=password):
return password # assume success
except Exception as e:
if e.args[0] != 'Bad password for file':
# assume all other errors happen after the password was accepted
raise RuntimeError(password) from e
def main():
if len(sys.argv) != 3:
sys.exit(__doc__) # print usage
start = timer()
# decode passwords using the preferred locale encoding
with open(sys.argv[2], errors='ignore') as file, \
Pool(initializer=init, initargs=[sys.argv[1]]) as pool: # use all CPUs
# check passwords encoded using utf-8
passwords = (line.rstrip('\n').encode('utf-8') for line in file)
passwords = filter(None, passwords) # filter empty passwords
for password in pool.imap_unordered(check, passwords, chunksize=100):
if password is not None: # found
print("Password: '{}'".format(password.decode('utf-8')))
break
else:
sys.exit('Unable to find password')
print('Runtime: %.5f seconds' % (timer() - start,))
if __name__=="__main__":
main()
注:
- 每个工作进程都有自己的
ZipFile
对象,每个进程打开一次 zip 文件:它应该使它更便携(Windows 支持)并提高时间性能 - 内容未提取:
check(password)
尝试打开并在成功时立即关闭存档成员:它更安全并且应该提高时间性能(无需创建目录等) - 解密存档成员时除
'Bad password for file'
之外的所有错误都假定在密码被接受后发生:合理的是避免沉默意外错误——每个异常应单独考虑 check(password)
需要非空密码chunksize
参数可能会显着提高性能- 使用了一种罕见的
for
/else
语法,用于报告找不到密码的情况 with
-语句为您调用pool.terminate()