Tensorflow:如何使用与多个进程并行运行的 fit() 生成器

Tensorflow: How to use a generator for fit() which runs in parallel with multiple processes

我正在尝试在不适合我的 RAM 的数据集上训练模型。 因此,我正在使用继承自 tensorflow.keras.utils.Sequence 的数据生成器,如下所示。 这是工作。但是,因为我正在对图像进行处理,所以我的训练受到 CPU 限制。在查看 GPU-Z 时,我的 GPU 仅占 10-20%,但我的 CPU 核心之一已达到最大值。
为了解决这个问题,我试图 运行 在我所有的 16 个核心上并行生成发电机。但是,当我在 fit() 函数中设置 use_multiprocessing=True 时,程序会冻结。并且使用 workers=8 并不会加速这个过程,只会以不均匀的间隔生产批次。
前任。: 批次 1-8 被立即处理,而不是有一些延迟,批次 9-16 被处理。

下面的代码显示了我正在尝试做的事情。

#read the dataset
x, o_y = reader.read_dataset_whole(ETLCharacterGroups.kanji)

#split data into 90/10 percent parts
percentage = round(len(x) / 100 * 80)

x_train = x[:percentage]
x_test = x[percentage:]

y_train = o_y[:percentage]
y_test = o_y[percentage:]
def distort_sample(img : Image) -> (Image, [int], [int]):
    """
    Distort the given image randomly.

    Randomly applies the transformations:
        - rotation
        - shear
        - scale
        - translate
        - sharpen
        - blur

    Returns the distorted image.
    """

    offset, scale = (0, 0), (64, 64)

    t = random.choice(["sine"]) # "rotate", "shear", "scale", 
    f = random.choice(["blur", "sharpen", "smooth"])

    # randomly apply transformations...
    # rotate image
    if("rotate" in t):
        img = img.rotate(random.uniform(-30, 30))
    
    # shear image
    if("shear" in t):
        y_shear = random.uniform(-0.2, 0.2)
        x_shear = random.uniform(-0.2, 0.2)
        img = img.transform(img.size, PImage.AFFINE, (1, x_shear, 0, y_shear, 1, 0))
    
    # scale and translate image
    if("scale" in t):
        #scale the image
        size_x = random.randrange(20, 63)
        size_y = random.randrange(20, 63)
        scale = (size_x, size_y)
        offset = (math.ceil((64 - size_x) / 2), math.ceil((64 - size_y) / 2))
        img = img.resize(scale)

        # put it again on a black background (translated)
        background = PImage.new('L', (64, 64))
        trans_x = random.randrange(0, math.floor((64 - size_x)))
        trans_y = random.randrange(0, math.floor((64 - size_y)))
        offset = (trans_x, trans_y)
        background.paste(img, offset)
        img = background
    
    if("sine" in t):
        t_img = np.array(img)

        A = t_img.shape[0] / 3.0
        w = 2.0 / t_img.shape[1]

        shift = lambda x: random.uniform(0.15, 0.2) * A * np.sin(-2*np.pi*x * w)

        for i in range(t_img.shape[0]):
            t_img[:,i] = np.roll(t_img[:,i], int(shift(i)))

        img = PImage.fromarray(t_img)


    # blur
    if("blur" in f):
        img = img.filter(ImageFilter.GaussianBlur(radius=random.uniform(0.5, 1.2)))

    # sharpen
    if("sharpen" in f):
        img = img.filter(ImageFilter.SHARPEN)
        
    # smooth
    if("smooth" in f):
        img = img.filter(ImageFilter.SMOOTH)

    return img, offset, scale
class DataGenerator(tf.keras.utils.Sequence):

    def __init__(self, x_col, y_col, batch_size, mode="training",  shuffle=True):
        self.batch_size = batch_size
        self.undistorted_images = batch_size // 2
        self.shuffle = shuffle

        self.indices = len(x_col)
        self.x_col = x_col
        self.y_col = y_col

    def __len__(self):
        return self.indices // self.batch_size

    def on_epoch_end(self):
        if(False):
            rng_state = np.random.get_state()
            np.random.shuffle(x)
            np.random.set_state(rng_state)
            np.random.shuffle(o_y)
            
    def __getitem__(self, index):
        X, Y = [], []
        
        for i in range(index * self.undistorted_images, (index+1) * self.undistorted_images):
            base_img = self.x_col[i]
            img = PImage.fromarray(np.uint8(base_img.reshape(64, 64) * 255))
            # distort_sample() creates random variations of an image
            img, *unused = distort_sample(img)

            # add transformed image
            X.append(np.array(img).reshape(64, 64, 1))
            Y.append(self.y_col[i])
            
            # add base image
            X.append(base_img)
            Y.append(self.y_col[i])

        return np.array(X), np.array(Y)
#instantiate generators
training_generator = DataGenerator(x_col = x_train, y_col = y_train, batch_size = 256)
validation_generator = DataGenerator(x_col = x_test, y_col = y_test, batch_size = 256)
#train the model
hist = model.fit(
    x=training_generator,
    epochs=100,
    validation_data=training_generator,
    max_queue_size=50,
    workers=8,
    #use_multiprocessing=True   <- this freezes the program
)

最后我需要让数据生成器使用多重处理。为此,数组需要存储在共享内存中,而不是在子进程中使用。

import multiprocessing as mp
import numpy as np
from PIL import Image as PImage
from PIL import ImageFilter
import random
import math
import tensorflow as tf



shared_dict = {}


def distort_sample(img : PImage) -> (PImage, [int], [int]):
    """
    Distort the given image randomly.
    Randomly applies the transformations:
        rotation, shear, scale, translate, 
    Randomly applies the filter:
        sharpen, blur, smooth
    Returns the distorted image.
    """

    offset, scale = (0, 0), (64, 64)

    t = random.choice(["sine", "rotate", "shear", "scale"])
    f = random.choice(["blur", "sharpen", "smooth"])

    # randomly apply transformations...
    # rotate image
    if("rotate" in t):
        img = img.rotate(random.uniform(-15, 15))
    
    # shear image
    if("shear" in t):
        y_shear = random.uniform(-0.2, 0.2)
        x_shear = random.uniform(-0.2, 0.2)
        img = img.transform(img.size, PImage.AFFINE, (1, x_shear, 0, y_shear, 1, 0))
    
    # scale and translate image
    if("scale" in t):
        #scale the image
        size_x = random.randrange(25, 63)
        size_y = random.randrange(25, 63)
        scale = (size_x, size_y)
        offset = (math.ceil((64 - size_x) / 2), math.ceil((64 - size_y) / 2))
        img = img.resize(scale)

        # put it again on a black background (translated)
        background = PImage.new('L', (64, 64))
        trans_x = random.randrange(0, math.floor((64 - size_x)))
        trans_y = random.randrange(0, math.floor((64 - size_y)))
        offset = (trans_x, trans_y)
        background.paste(img, offset)
        img = background
    
    if("sine" in t):
        t_img = np.array(img)

        A = t_img.shape[0] / 3.0
        w = 2.0 / t_img.shape[1]

        shift_factor = random.choice([-1, 1]) * random.uniform(0.15, 0.2)
        shift = lambda x: shift_factor * A * np.sin(-2*np.pi*x * w)

        for i in range(t_img.shape[0]):
            t_img[:,i] = np.roll(t_img[:,i], int(shift(i)))

        img = PImage.fromarray(t_img)


    # blur
    if("blur" in f):
        img = img.filter(ImageFilter.GaussianBlur(radius=random.uniform(0.5, 1.2)))

    # sharpen
    if("sharpen" in f):
        img = img.filter(ImageFilter.SHARPEN)
        
    # smooth
    if("smooth" in f):
        img = img.filter(ImageFilter.SMOOTH)

    return img, offset, scale

def generator_func(start_index, end_index, x_shape, y_shape):
    X, Y = [], []
    
    x_loc = np.frombuffer(shared_dict["x"], dtype="float16").reshape(x_shape)
    y_loc = np.frombuffer(shared_dict["y"], dtype="b").reshape(y_shape)
    
    for i in range(start_index, end_index):
        base_img = x_loc[i]
        img = PImage.fromarray(np.uint8(base_img.reshape(64, 64) * 255))
        img, *unused = distort_sample(img)

        # add transformed image
        X.append(np.array(img).reshape(64, 64, 1))
        Y.append(y_loc[i])
        X.append(np.array(img).reshape(64, 64, 1))
        Y.append(y_loc[i])

        # add base image
        #X.append(base_img)
        #Y.append(y_loc[i])
        
    return X, Y

def generator_initializer(_x_shared, _y_shared):
    shared_dict["x"] = _x_shared
    shared_dict["y"] = _y_shared

def generator_func(start_index, end_index, x_shape, y_shape):
    X, Y = [], []
    
    x_loc = np.frombuffer(shared_dict["x"], dtype="float16").reshape(x_shape)
    y_loc = np.frombuffer(shared_dict["y"], dtype="b").reshape(y_shape)
    
    for i in range(start_index, end_index):
        base_img = x_loc[i]
        img = PImage.fromarray(np.uint8(base_img.reshape(64, 64) * 255))
        img, *unused = distort_sample(img)

        # add transformed image
        X.append(np.array(img).reshape(64, 64, 1))
        Y.append(y_loc[i])
        X.append(np.array(img).reshape(64, 64, 1))
        Y.append(y_loc[i])

        # add base image
        #X.append(base_img)
        #Y.append(y_loc[i])
        
    return X, Y

class DataGenerator(tf.keras.utils.Sequence):

    def __init__(self, num_samples, batch_size,
                        percentage, mode,
                        x_shared, y_shared,
                        x_np_shape, y_np_shape,
                        processes, shuffle=True):
        self.num_samples = num_samples
        # 50% original images + 50% augmented images 
        self.batch_size = batch_size // 2
        self.percentage = percentage

        # an offset to devide the data set into test and train
        self.start_index = 0
        if(mode == "testing"):
            self.start_index = num_samples - (num_samples // 100 * percentage)
        # is this a train or a test generator
        self.mode = mode
        # how many processes should be used for this generator
        self.processes = processes
        # should the arrays be shuffled after each epoch
        self.shuffle = shuffle

        self.x_np_shape = x_np_shape
        self.y_np_shape = y_np_shape
        
        # a pool of processes for generating augmented data
        self.pool = mp.Pool(processes=self.processes,
            initializer=generator_initializer,
            initargs=(x_shared, y_shared))
        
    def __len__(self):
        return (self.num_samples // 100 * self.percentage) // self.batch_size

    def on_epoch_end(self):
        if(False):
            rng_state = np.random.get_state()
            np.random.shuffle(x_np)
            np.random.set_state(rng_state)
            np.random.shuffle(y_np)
            
    def __getitem__(self, index):

        arguments = []
        slice_size = self.batch_size // self.processes
        current_batch = index * self.batch_size
        for i in range(self.processes):
            slice_start = self.start_index + (current_batch + i * slice_size)
            slice_end = self.start_index + (current_batch + (i+1) * slice_size)
            arguments.append([slice_start, slice_end, self.x_np_shape, self.y_np_shape])
        
        return_values = self.pool.starmap(generator_func, arguments)

        X, Y = [], []
        for imgs, labels in return_values:
            X.append(imgs)
            Y.append(labels)

        return np.concatenate(X).astype(np.float16), np.concatenate(Y).astype(np.float16)