用于加密多个文件的多线程或多处理

multithreading or multiprocessing for encrypting multiple files

我创建了一个函数 enc()

def enc():
    password = bytes('asd123','utf-8')
    salt = bytes('asd123','utf-8')
    kdf = PBKDF2HMAC(
        algorithm=hashes.SHA256(),
        length=32,
        salt=salt,
        iterations=10000,
        backend=default_backend())
    key = base64.urlsafe_b64encode(kdf.derive(password))
    f = Fernet(key)

    for file in files:
        with open(file,'rb') as original_file:
            original = original_file.read()

        encrypted = f.encrypt(original)

        with open (file,'wb') as encrypted_file:
            encrypted_file.write(encrypted)

循环遍历文件中的每个文件并对其进行加密。

files = ['D:/folder/asd.txt',
          'D:/folder/qwe.mp4',
          'D:/folder/qwe.jpg']

我想使用多线程或多处理来让它更快。可能吗? 需要一些代码方面的帮助。

我试过多线程

thread = threading.Thread(target=enc)
thread.start()
thread.join()

但它似乎并没有提高速度或时间。我需要一些帮助来实现多处理。 谢谢

您需要修改函数。

Python 不够聪明,无法知道您需要多处理代码的哪一部分。

很可能是for循环对了,你想并行加密文件。所以你可以尝试这样的事情。

定义每个循环需要运行的函数,然后在外面创建for循环。然后像这样使用多处理。

import multiprocessing

password = bytes('asd123','utf-8')
salt = bytes('asd123','utf-8')
    kdf = PBKDF2HMAC(
        algorithm=hashes.SHA256(),
        length=32,
        salt=salt,
        iterations=10000,
        backend=default_backend())
key = base64.urlsafe_b64encode(kdf.derive(password))
f = Fernet(key)

def enc(file):
    with open(file,'rb') as original_file:
        original = original_file.read()

    encrypted = f.encrypt(original)

    with open (file,'wb') as encrypted_file:
        encrypted_file.write(encrypted)
    

if __name__ == '__main__':
    jobs = []
    for file in files:
        p = multiprocessing.Process(target=enc, args=(file,))
        jobs.append(p)
        p.start()

线程不是 CPU 密集型任务的最佳选择,除非任务正在执行,例如,由释放全局解释器锁的 C 语言库例程执行。无论如何,除非您 运行 多个 并行处理,否则您肯定会通过多线程或多处理获得任何性能提升。

假设您有 N 个任务和 M 个处理器来处理这些任务。如果任务是纯粹的 CPU 而没有 I/O (不完全是你的情况),那么启动超过 M 个进程来处理你的 N 任务没有任何优势,为此多处理池是理想的情况。当存在 CPU 和 I/O 的混合时, 可能 具有大于 M 的池大小是有利的,如果存在,甚至可能与 N 一样大很多 I/O 而很少 CPU。但在那种情况下,实际使用多线程池 多处理池(大小为 M)的组合会更好,其中多线程池用于所有 I/O 工作和 CPU 计算的多处理池。以下代码显示了该技术:

from multiprocessing.pool import Pool, ThreadPool
from multiprocessing import cpu_count
from functools import partial

def encrypt(key, b):
    f = Fernet(key)
    return f.encrypt(b)

def enc(key, process_pool, file):
    with open(file,'rb') as original_file:
        original = original_file.read()

    encrypted = process_pool.apply(encrypt, args=(key, original,))

    with open (file,'wb') as encrypted_file:
        encrypted_file.write(encrypted)


def main():
    password = bytes('asd123','utf-8')
    salt = bytes('asd123','utf-8')
    kdf = PBKDF2HMAC(
        algorithm=hashes.SHA256(),
        length=32,
        salt=salt,
        iterations=10000,
        backend=default_backend())
    key = base64.urlsafe_b64encode(kdf.derive(password))

    files = ['D:/folder/asd.txt',
              'D:/folder/qwe.mp4',
              'D:/folder/qwe.jpg']

    # Too many threads may be counter productive due to disk contention
    # Should MAX_THREADS be unlimited?
    # For a solid-state drive with no physical arm movement,
    # an extremely large value, e.g. 500, probably would not hurt.
    # For "regular" drives, one needs to experiment
    MAX_THREADS = 500 # Essentially no limit
    # compute number of processes in our pool
    # the lesser of number of files to process and the number of cores we have:
    pool_size = min(MAX_THREADS, cpu_count(), len(files))
    # create process pool:
    process_pool = Pool(pool_size)
    # create thread pool:
    thread_pool = ThreadPool(len(files))
    worker = partial(enc, key, process_pool)
    thread_pool.map(worker, files)

if __name__ == '__main__':
    main()

评论

无论如何,重点是:假设您有 30 个文件和 4 个内核,而不是 3 个文件。 @anarchy 发布的解决方案将启动 30 个进程并计算 f 30 次,但实际上只能有效地利用 4 个处理器进行 f 的并行计算和进行加密。我的解决方案将使用 30 个线程来执行 I/O 但只启动 4 个进程,因此只计算 f 4 次。您节省了创建 26 个进程和 26 个无用的 f 计算。

除非您有固态驱动器,否则少于 30 个线程甚至可能更好,因为您的所有线程都在争用同一个驱动器,并且 (1) 每个文件可能位于驱动器上完全不同的位置并对此类文件执行并发 I/O 可能会适得其反,并且 (2) 任何特定驱动器都可以实现一些最大吞吐量。

所以也许我们应该:


    thread_pool = ThreadPool(min(len(files), MAX_THREADS))

其中 MAX_THREADS 设置为适合您的特定驱动器的某个最大值。

更新

现在 key 的昂贵计算只做了一次。

OP 的新问题 运行 TKinter

实际上你有两个问题。不仅打开了多个 windows,而且您在尝试调用多处理工作函数 encrypt 时可能还会遇到 pickle 错误,因为此类函数必须定义在全局范围,而不是像您所做的那样嵌套在另一个函数中。

在使用方法 spawn 创建新进程的平台上,例如 Windows,创建和初始化使用您的 process_pool = Pool(pool_size) 语句,创建一个新的空地址 space 并启动一个新的 Python 解释器,重新读取并重新执行源程序以初始化地址 space在最终调用 worker 函数测试之前。这意味着全局范围内的每个语句,即导入语句、变量声明、函数声明等,都是为此目的而执行的。但是,在新的子流程变量 __name__ 中不会是 '__main__' 因此 if __name__ == '__main__' : 块中的任何语句 在全局范围内 将不会被执行.顺便说一句,这就是为什么 Windows 平台在全局范围内最终导致创建新进程的代码被放置在这样一个块中的原因。如果不这样做,如果它以其他方式未被发现,将导致无限递归进程创建循环。但是你在嵌套函数中对 __name__ 进行了这样的检查,它没有任何用处。

但是意识到全局范围内的所有语句都将作为多处理池中每个进程初始化的一部分执行,理想情况下,您应该只在全局范围内拥有那些进程初始化所需的语句,或者在最少“无害”的陈述,即其存在不会 过度 执行成本或没有令人不快的副作用的陈述。有害语句也应放在 if __name__ == '__main__' : 块内或移至函数内。

现在应该清楚了,您创建主 window 的语句是“有害的”语句,您不希望每个新创建的进程都执行这些语句。您的代码的尾部应如下所示(我还合并了一个 MAX_THREADS 常量来限制将创建的最大线程数,尽管这里它设置得任意大——您应该尝试使用更小的值,例如如 3、5、10、20 等,看看什么能为您提供最佳吞吐量):

def passerrorbox():
    tk.messagebox.showerror('Password Error','Enter a Password')
    fipasswordbox.delete(0,'end')
    fisaltbox.delete(0,'end')
    filistbox.delete(0,'end')

# Changes start here:

# Get rid of all nesting of functions:
def encrypt(key, a):
    f = Fernet(key)
    return f.encrypt(a)

def enc(key, process_pool, file):
    # File Encryption
    with open(file,'rb') as original_file:
        original = original_file.read()

    encrypted = process_pool.apply(encrypt, args=(key, original,))

    with open (file,'wb') as encrypted_file:
        encrypted_file.write(encrypted)

def encfile(): # was previously named main
    password = bytes(fipasswordbox.get(), 'utf-8')
    salt = bytes(fisaltbox.get(),'utf-8')
    fileln = filistbox.get(0,'end')

    if len(fileln) == 0:
        fierrorbox()
    elif len(password) == 0:
        passerrorbox()
    else:
        file_enc_button['state']='disabled'
        browsefi['state']='disabled'

        fipasswordbox['state']='disabled'
        fisaltbox['state']='disabled'

        kdf = PBKDF2HMAC(
            algorithm=hashes.SHA256(),
            length=32,
            salt=salt,
            iterations=10000,
            backend=default_backend())
        key = base64.urlsafe_b64encode(kdf.derive(password))

        # Too many threads may be counter productive due to disk contention
        # Should MAX_THREADS be unlimited?
        # For a solid-state drive with no physical arm movement,
        # an extremely large value, e.g. 500, probably would not hurt.
        # For "regular" drives, one needs to experiment
        MAX_THREADS = 500 # Essentially no limit
        pool_size = min(MAX_THREADS, cpu_count(), len(fileln))
        process_pool = Pool(pool_size)
        thread_pool = ThreadPool(min(MAX_THREADS, len(fileln)))
        worker = partial(enc, key, process_pool)
        thread_pool.map(worker, fileln)

        fiencdone()

if __name__ == '__main__':
    root = tk.Tk()
    fileframe()
    root.mainloop()