Python 3 中的多处理

Multiprocessing acting up in Python 3

我正在研究 zip 文件破解程序,并决定使用多处理模块来加快进程。这是一个彻底的痛苦,因为这是我第一次使用这个模块,我什至还没有完全理解它。但是,我让它工作了。

问题是它没有完成单词列表;它只是在单词列表中随机放置,如果找到密码,它会继续遍历单词列表,而不仅仅是停止过程。

有谁知道它为什么表现出这种行为?

ZipFile Cracker 源代码

#!/usr/bin/env python3

import multiprocessing as mp
import zipfile # Handeling the zipfile
import sys # Command line arguments, and quiting application
import time # To calculate runtime

def usage(program_name):
    print("Usage: {0} <path to zipfile> <dictionary>".format(program_name))
    sys.exit(1)

def cracker(password):
    try:
        zFile.extractall(pwd=password)
        print("[+] Password Found! : {0}".format(password.decode('utf-8')))
        pool.close()
    except:
        pass

def main():
    global zFile
    global pool

    if len(sys.argv) < 3:
        usage(sys.argv[0])

    zFile = zipfile.ZipFile(sys.argv[1])

    print("[*] Started Cracking")

    startime = time.time()
    pool = mp.Pool()

    for i in open(sys.argv[2], 'r', errors='ignore'):
        pswd = bytes(i.strip('\n'), 'utf-8')
        pool.apply_async(cracker, (pswd,))

    print (pswd)
    runtime =  round(time.time() - startime, 5)
    print ("[*] Runtime:", runtime, 'seconds')
    sys.exit(0)

if __name__ == "__main__":
    main()

您过早终止了您的程序。为了测试这一点,在 cracker 方法中添加一个无害的 time.sleep(10) 并观察您的程序仍然在一秒钟内终止。

调用join等待池完成:

pool = mp.Pool()
for i in open(sys.argv[2], 'r', errors='ignore'):
    pswd = bytes(i.strip('\n'), 'utf-8')
    pool.apply_async(cracker, (pswd,))

pool.close()  # Indicate that no more data is coming
pool.join()   # Wait for pool to finish processing

runtime =  round(time.time() - startime, 5)
print ("[*] Runtime:", runtime, 'seconds')
sys.exit(0)

此外,一旦您找到正确的密码,调用 close 仅表示没有更多的未来任务到来 - 所有已提交的任务仍将完成。相反,调用 terminate 终止池而不处理任何更多任务。

此外,根据 multiprocessing.Pool 的实现细节,全局变量 pool 可能在您需要时不可用(并且它的值无论如何都不可序列化)。要解决这个问题,可以使用回调,如

def cracker(password):
    try:
        zFile.extractall(pwd=password)
    except RuntimeError:
        return
    return password

 def callback(found):
     if found:
         pool.terminate()
 ...
 pool.apply_async(cracker, (pswd,), callback=cb)

当然,既然你现在一直在看结果,apply并不是正确的做法。相反,您可以使用 imap_unordered:

编写代码
with open(sys.argv[2], 'r', errors='ignore') as passf, \
         multiprocessing.Pool() as pool:
     passwords = (line.strip('\n').encode('utf-8') for line in passf)
     for found in pool.imap_unordered(cracker, passwords):
         if found:
             break

除了使用全局变量,您可能还想在每个进程中打开 zip 文件(并创建一个 ZipFile object),方法是对池使用 initializer。更好(并且更快),首先放弃所有 I/O 并只读取一次您需要的字节,然后将它们传递给 children.

phihag 的回答是正确的解决方案。

我只是想提供有关在找到正确密码后调用 terminate() 的更多详细信息。当我 运行 代码时, cracker() 中的 pool 变量未定义。所以试图从那里调用它只会抛出一个异常:

NameError: name 'pool' is not defined

(我的fork()经验薄弱,完全不明白为什么全局zFile复制到子进程成功,而pool没有。即使被复制了,它在父进程中不会是相同的 pool,对吧?所以在它上面调用的任何方法都不会影响父进程中的 real 池。无论如何,我更喜欢 multiprocessing 模块的 编程指南 部分中列出的 this 建议: 将资源显式传递给子进程 。 )

我的建议是cracker()return密码正确的话,否则returnNone。然后将回调传递给 apply_async() 以记录正确的密码,并终止池。这是我修改您的代码以执行此操作的看法:

#!/usr/bin/env python3

import multiprocessing as mp
import zipfile # Handeling the zipfile
import sys # Command line arguments, and quiting application
import time # To calculate runtime
import os

def usage(program_name):
    print("Usage: {0} <path to zipfile> <dictionary>".format(program_name))
    sys.exit(1)

def cracker(zip_file_path, password):
    print('[*] Starting new cracker (pid={0}, password="{1}")'.format(os.getpid(), password))

    try:
        time.sleep(1) # XXX: to simulate the task taking a bit of time
        with zipfile.ZipFile(zip_file_path) as zFile:
            zFile.extractall(pwd=bytes(password, 'utf-8'))
        return password
    except:
        return None

def main():
    if len(sys.argv) < 3:
        usage(sys.argv[0])

    print('[*] Starting main (pid={0})'.format(os.getpid()))

    zip_file_path = sys.argv[1]
    password_file_path = sys.argv[2]
    startime = time.time()
    actual_password = None

    with mp.Pool() as pool:
        def set_actual_password(password):
            nonlocal actual_password
            if password:
                print('[*] Found password; stopping future tasks')
                pool.terminate()
                actual_password = password

        with open(password_file_path, 'r', errors='ignore') as password_file:
            for pswd in password_file:
                pswd = pswd.strip('\n')
                pool.apply_async(cracker, (zip_file_path, pswd,), callback=set_actual_password)

        pool.close()
        pool.join()

    if actual_password:
        print('[*] Cracked password: "{0}"'.format(actual_password))
    else:
        print('[*] Unable to crack password')
    runtime =  round(time.time() - startime, 5)
    print("[*] Runtime:", runtime, 'seconds')
    sys.exit(0)

if __name__ == "__main__":
    main()

下面是 and 建议的实现:

#!/usr/bin/env python3
"""Brute force zip password.

Usage: brute-force-zip-password <zip archive> <passwords>
"""
import sys
from multiprocessing import Pool
from time import monotonic as timer
from zipfile import ZipFile

def init(archive): # run at the start of a worker process
    global zfile
    zfile = ZipFile(open(archive, 'rb')) # open file in each process once

def check(password):
    assert password
    try:
        with zfile.open(zfile.infolist()[0], pwd=password):
            return password # assume success
    except Exception as e:
        if e.args[0] != 'Bad password for file':
            # assume all other errors happen after the password was accepted
            raise RuntimeError(password) from e

def main():
    if len(sys.argv) != 3:
        sys.exit(__doc__) # print usage

    start = timer()
    # decode passwords using the preferred locale encoding
    with open(sys.argv[2], errors='ignore') as file, \
         Pool(initializer=init, initargs=[sys.argv[1]]) as pool: # use all CPUs
        # check passwords encoded using utf-8
        passwords = (line.rstrip('\n').encode('utf-8') for line in file)
        passwords = filter(None, passwords) # filter empty passwords
        for password in pool.imap_unordered(check, passwords, chunksize=100):
            if password is not None:  # found
                print("Password: '{}'".format(password.decode('utf-8')))
                break
        else:
            sys.exit('Unable to find password')
    print('Runtime: %.5f seconds' % (timer() - start,))

if __name__=="__main__":
    main()

注:

  • 每个工作进程都有自己的 ZipFile 对象,每个进程打开一次 zip 文件:它应该使它更便携(Windows 支持)并提高时间性能
  • 内容未提取:check(password) 尝试打开并在成功时立即关闭存档成员:它更安全并且应该提高时间性能(无需创建目录等)
  • 解密存档成员时除'Bad password for file'之外的所有错误都假定在密码被接受后发生:合理的是避免沉默意外错误——每个异常应单独考虑
  • check(password) 需要非空密码
  • chunksize 参数可能会显着提高性能
  • 使用了一种罕见的 for/else 语法,用于报告找不到密码的情况
  • with-语句为您调用 pool.terminate()