相同的代码,多线程比多处理快 900 倍

Same code, multithreading is 900x faster than multiprocessing

您需要下载 fonts.zip 并将其解压缩到与示例代码相同的文件夹中 运行。

此代码的目的是生成随机文本,将文本渲染并保存为图像。该代码接受 lettersnumbers,它们分别是要从中生成文本的字母和数字的总体。它还接受 character_frequency 来确定将生成每个字符的实例数。然后生成一个长字符串,并将其拆分为存储在 TextGenerator.dataset 属性中的随机大小子字符串,该属性来自 TextGenerator.initialize_dataset.

Ex: for letters = 'abc', numbers = '123', character frequency = 3, 'aaabbbccc111222333' is generated, shuffled, and split to random size substrings ex: ['a312c', '1b1', 'bba32c3a2c'].

然后每个单词将被渲染并保存为图像,该图像来自TextGenerator.save_images,这是这个问题的主题。

executor 参数将 concurrent.futures.ThreadPoolExecutorconcurrent.futures.ProcessPoolExecutor 传递给 TextGenerator 在下面的示例中用于演示目的。

这是什么问题?

character_frequency 增加的越多,存储在 TextGenerator.dataset 中的数据集的时间就越长,但是,它不应该影响性能。实际发生的情况:character_frequency 越多,TextGenerator.save_images 完成 concurrent.futures.ProcessPoolExecutor 所需的时间就越多。另一方面,在一切保持不变的情况下,通过 concurrent.futures.ThreadPoolExecutor 代替,所需时间是恒定的,不受 character_frequency.

的影响
import random
import string
import tempfile
import textwrap
from concurrent.futures import (ProcessPoolExecutor, ThreadPoolExecutor,
                                as_completed)
from pathlib import Path
from time import perf_counter

import numpy as np
import pandas as pd
from cv2 import cv2
from PIL import Image, ImageDraw, ImageFont


class TextGenerator:
    def __init__(
        self,
        fonts,
        character_frequency,
        executor,
        max_images=None,
        background_colors=((255, 255, 255),),
        font_colors=((0, 0, 0),),
        font_sizes=(25,),
        max_example_size=25,
        min_example_size=1,
        max_chars_per_line=80,
        output_dir='data',
        workers=1,
        split_letters=False,
    ):
        assert (
            min_example_size > 0
        ), f'`min_example_size` should be > 0`, got {min_example_size}'
        assert (
            max_example_size > 0
        ), f'`max_example_size` should be > 0`, got {max_example_size}'
        self.fonts = fonts
        self.character_frequency = character_frequency
        self.executor = executor
        self.max_images = max_images
        self.background_colors = background_colors
        self.font_colors = font_colors
        self.font_sizes = font_sizes
        self.max_example_size = max_example_size
        self.min_example_size = min_example_size
        self.max_chars_per_line = max_chars_per_line
        self.output_dir = Path(output_dir)
        self.workers = workers
        self.split_letters = split_letters
        self.digits = len(f'{character_frequency}')
        self.max_font = max(font_sizes)
        self.generated_labels = []
        self.dataset = []
        self.dataset_size = 0

    def render_text(self, text_lines):
        font = random.choice(self.fonts)
        font_size = random.choice(self.font_sizes)
        background_color = random.choice(self.background_colors)
        font_color = random.choice(self.font_colors)
        max_width, total_height = 0, 0
        font = ImageFont.truetype(font, font_size)
        line_sizes = {}
        for line in text_lines:
            width, height = font.getsize(line)
            line_sizes[line] = width, height
            max_width = max(width, max_width)
            total_height += height
        image = Image.new('RGB', (max_width, total_height), background_color)
        draw = ImageDraw.Draw(image)
        current_height = 0
        for line_text, dimensions in line_sizes.items():
            draw.text((0, current_height), line_text, font_color, font=font)
            current_height += dimensions[1]
        return np.array(image)

    def display_progress(self, example_idx):
        print(
            f'\rGenerating example {example_idx + 1}/{self.dataset_size}',
            end='',
        )

    def generate_example(self, text_lines, example_idx):
        text_box = self.render_text(text_lines)
        filename = (self.output_dir / f'{example_idx:0{self.digits}d}.jpg').as_posix()
        cv2.imwrite(filename, text_box)
        return filename, text_lines

    def create_dataset_pool(self, executor, example_idx):
        future_items = []
        for j in range(self.workers):
            if not self.dataset:
                break
            text = self.dataset.pop()
            if text.strip():
                text_lines = textwrap.wrap(text, self.max_chars_per_line)
                future_items.append(
                    executor.submit(
                        self.generate_example,
                        text_lines,
                        j + example_idx,
                    )
                )
        return future_items

    def write_images(self):
        i = 0
        with self.executor(self.workers) as executor:
            while i < self.dataset_size:
                future_items = self.create_dataset_pool(executor, i)
                for future_item in as_completed(future_items):
                    filename, text_lines = future_item.result()
                    if filename:
                        self.generated_labels.append(
                            {'filename': filename, 'label': '\n'.join(text_lines)}
                        )
                    self.display_progress(i)
                i += min(self.workers, self.dataset_size - i)
                if self.max_images and i >= self.max_images:
                    break

    def initialize_dataset(self, letters, numbers, space_freq):
        for characters in letters, numbers:
            dataset = list(
                ''.join(
                    letter * self.character_frequency
                    for letter in characters + ' ' * space_freq
                )
            )
            random.shuffle(dataset)
            self.dataset.extend(dataset)
        i = 0
        temp_dataset = []
        min_split_example_size = min(self.max_example_size, self.max_chars_per_line)
        total_letters = len(self.dataset)
        while i < total_letters - self.min_example_size:
            example_size = random.randint(self.min_example_size, self.max_example_size)
            example = ''.join(self.dataset[i : i + example_size])
            temp_dataset.append(example)
            i += example_size
            if self.split_letters:
                split_example = ' '.join(list(example))
                for sub_example in textwrap.wrap(split_example, min_split_example_size):
                    if (sub_example_size := len(sub_example)) >= self.min_example_size:
                        temp_dataset.append(sub_example)
                        i += sub_example_size
        self.dataset = temp_dataset
        self.dataset_size = len(self.dataset)

    def generate(self, letters, numbers, space_freq, fp='labels.csv'):
        self.output_dir.mkdir(parents=True, exist_ok=True)
        self.initialize_dataset(letters, numbers, space_freq)
        t1 = perf_counter()
        self.write_images()
        t2 = perf_counter()
        print(
            f'\ntotal time: {t2 - t1} seconds, character frequency '
            f'specified: {self.character_frequency}, type: {self.executor.__name__}'
        )
        pd.DataFrame(self.generated_labels).to_csv(self.output_dir / fp, index=False)


if __name__ == '__main__':
    out = Path(tempfile.mkdtemp())
    total_images = 15
    for char_freq in [100, 1000, 1000000]:
        for ex in [ThreadPoolExecutor, ProcessPoolExecutor]:
            g = TextGenerator(
                [
                    p.as_posix()
                    for p in Path('fonts').glob('*.ttf')
                ],
                char_freq,
                ex,
                max_images=total_images,
                output_dir=out,
                max_example_size=15,
                min_example_size=5,
            )
            g.generate(string.ascii_letters, '0123456789', 1)

在我的 i5 mbp 上产生以下结果:

Generating example 15/649
total time: 0.0652076720000001 seconds, character frequency specified: 100, type: ThreadPoolExecutor
Generating example 15/656
total time: 1.1637316500000001 seconds, character frequency specified: 100, type: ProcessPoolExecutor
Generating example 15/6442
total time: 0.06430166800000015 seconds, character frequency specified: 1000, type: ThreadPoolExecutor
Generating example 15/6395
total time: 1.2626316840000005 seconds, character frequency specified: 1000, type: ProcessPoolExecutor
Generating example 15/6399805
total time: 0.05754961300000616 seconds, character frequency specified: 1000000, type: ThreadPoolExecutor
Generating example 15/6399726
total time: 45.18768219699999 seconds, character frequency specified: 1000000, type: ProcessPoolExecutor

0.05 秒(线程)与 45 秒(进程)保存 15 张图像 character_frequency = 1000000。为什么要花这么长时间?为什么它会受到 character_frequency 值的影响?这是独立的,应该只影响初始化时间(这正是线程发生的事情)

假设我正确解释了您的代码,您将生成大小由 character_frequency 值控制的示例文本。值越大,文本越长。

文本是在您的程序的主循环中生成的。然后你安排一组任务接收所述文本并根据它生成图像。

由于进程位于单独的内存地址 spaces 中,文本需要通过 pipe 发送给它们。该管道是影响性能的瓶颈。您看到 character_frequency 增长时性能下降的原因是因为需要序列化更多文本并按顺序通过所述管道发送。您的工作人员在等待数据到达时正在挨饿。

此问题不会影响您的线程池,因为线程位于主进程的同一内存地址 space 中。因此,数据不需要序列化并通过您的操作系统发送。

要在使用进程时加快程序速度,您可以在工作程序本身中移动文本生成逻辑,或者将所述文本写入一个或多个文件中。然后让工作人员处理自己打开这些文件,以便您可以利用 I/O 并行化。您的所有主要进程所做的就是将工作人员指向正确的文件位置或文件名。