在 tensorflow.keras.layers 中使用自定义方法降低了训练速度

The speed of training is reduced using a custom method in tensorflow.keras.layers

我正在使用tensorflow.datacustom layers来解决数据扩充的瓶颈,但我发现单独使用tensorflow.data比混合使用更快,我不知道是什么在 custom layers 进行中,有人可以告诉我吗?

提前致谢!

这是我的数据增强代码,主要是做标准化和resize。

def random_normalization(data, mean, std):
    mean = tf.multiply(mean, tf.random.uniform(shape=(), minval=0.5,maxval=0.9, dtype=tf.float64))
    std = tf.multiply(std, tf.random.uniform(shape=(), minval=0.5,maxval=0.9, dtype=tf.float64))
    return tf.divide((tf.subtract(data, mean)), std)

def random_resize(data):
    def resizing(index, data, choice, enable, new_data, number, overlap):        
        FrontEnd = tf.cond(tf.math.greater_equal(tf.subtract(index, overlap), tf.constant(0)),
                           lambda: tf.subtract(index, overlap),
                           lambda: index)
        
        BackEnd = tf.cond(tf.math.less(tf.add(tf.add(index, 10),overlap),tf.constant(2000)),
                          lambda: tf.add(tf.add(index, 10),overlap),
                          lambda: index)
        
        z1 = tf.gather(data, indices=[0], axis=1)
        z1 = tf.gather(z1, indices=tf.range(FrontEnd, BackEnd), axis=0)
        
        z2 = tf.gather(data, indices=[1], axis=1)
        z2 = tf.gather(z2, indices=tf.range(FrontEnd, BackEnd), axis=0)
        
        z3 = tf.gather(data, indices=[2], axis=1)
        z3 = tf.gather(z3, indices=tf.range(FrontEnd, BackEnd), axis=0)
        
        z4 = tf.gather(data, indices=[3], axis=1)
        z4 = tf.gather(z4, indices=tf.range(FrontEnd, BackEnd), axis=0)
        
        z5 = tf.gather(data, indices=[4], axis=1)
        z5 = tf.gather(z5, indices=tf.range(FrontEnd, BackEnd), axis=0)
        
        z6 = tf.gather(data, indices=[5], axis=1)
        z6 = tf.gather(z6, indices=tf.range(FrontEnd, BackEnd), axis=0)
        
        
        new_data = tf.tensor_scatter_nd_update(new_data, [[number, 0], [number, 1], [number, 2],
                                                          [number, 3], [number, 4], [number, 5]], 
                                               [tf.math.reduce_mean(z1), tf.math.reduce_mean(z2),
                                                tf.math.reduce_mean(z3), tf.math.reduce_mean(z4),
                                                tf.math.reduce_mean(z5), tf.math.reduce_mean(z6)])
        
        
        return tf.add(index, 10), data, choice, enable, new_data, tf.add(number, 1), overlap
    
    choice = tf.random.uniform(shape=(), minval=0,maxval=4,dtype=tf.int32)
    enable = tf.random.uniform(shape=(), minval=0,maxval=1,dtype=tf.float64)
    overlap = tf.random.uniform(shape=(), minval=5,maxval=21,dtype=tf.int32)
    
    new_data = tf.zeros((200,6), dtype=tf.float64)
    index = tf.constant(0)
    number = tf.constant(0)
    condition = lambda index, data, choice, enable, new_data, number, overlap: tf.less(index, 2000)
    r = tf.while_loop(condition, resizing, loop_vars=(index, data, choice, enable, new_data, number, overlap))
    return r[4]

def normal_resize(data):
    data = tf.reshape(data, (2000,6,1))
    data = tf.image.resize(data, size=[200,6])
    return tf.cast(tf.reshape(data, (200,6)),dtype=tf.float64)

def augmentation(data, labels):
    mean = tf.math.reduce_mean(data,axis=0)
    std = tf.math.reduce_std(data,axis=0)
    data = tf.cond(tf.random.uniform(shape=(), minval=0, maxval=1,dtype=tf.float64) < tf.constant(0.8,dtype=tf.float64), 
                   lambda: random_normalization(data, mean, std), 
                   lambda: tf.divide((tf.subtract(data, mean)), std))
    
    # 2000 resize to 200
    data = tf.cond(tf.random.uniform(shape=(), minval=0, maxval=1,dtype=tf.float64) < tf.constant(0.8,dtype=tf.float64), 
                   lambda: random_resize(data), 
                   lambda: normal_resize(data))

    return data, labels

主要代码,包括tf.data和模型

if __name__ == '__main__':
    trainDS = tf.data.Dataset.from_tensor_slices((np.random.rand(3000,2000,6),
                                                  np.concatenate((np.zeros((1500)),np.ones((1500))))))
    trainDS = (
        trainDS
        .cache()
        .shuffle(1000, reshuffle_each_iteration=False)
        .map(augmentation, num_parallel_calls=tf.data.AUTOTUNE)
        .batch(128, drop_remainder=True)
        .prefetch(tf.data.AUTOTUNE))
    
    input = Input((200,6))
    x = LSTM(64, return_sequences=True)(input)
    output = Dense(1,activation='sigmoid')(x)
    model = Model(input, output)
    model.compile(optimizer='adam', loss='BinaryCrossentropy')
    model.fit(trainDS, epochs=3)

那么这是我自定义层的代码,虽然有点繁琐,但还是达到了我想要的效果

import tensorflow as tf
from tensorflow.keras.layers import LSTM, Dense, Input
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Layer
import numpy as np

class CustomLayer(Layer):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
    
    def execute(self, data, batch_size, new_data, _type):
        def _fun(index, data, _type, new_data):
            resized = tf.cond(_type,
                              lambda:augmentation(tf.reshape(tf.gather(data,[index]), (2000,6))),
                              lambda:normal_resize(tf.reshape(tf.gather(data,[index]), (2000,6))))
            values = tf.reshape(resized, (1,-1))[0]
            _Indices = self.createIndices(index)
            new_data = tf.tensor_scatter_nd_update(new_data, _Indices, values)
            return tf.add(index,1), data, _type, new_data
        
        index = tf.constant(0)
        condition = lambda index, data, _type, new_data: tf.less(index, batch_size)
        r = tf.while_loop(condition, _fun, loop_vars=(index, data, _type, new_data))
        return r[-1]
    
    def createIndices(self, BatchSizeIndex):
        def loop1(_i, BatchSizeIndex, col_num, _Indices):
            def loop2(_i, _j, BatchSizeIndex, col_num, _Indices):
                _Indices = tf.tensor_scatter_nd_update(_Indices, [[col_num, 0], [col_num, 1], [col_num, 2]], 
                                                        [BatchSizeIndex, _i, _j])
                return _i, tf.add(_j,1), BatchSizeIndex, tf.add(col_num,1), _Indices
            
            _j = tf.constant(0)
            condition_loop2 = lambda _i, _j, BatchSizeIndex, col_num, _Indices: tf.less(_j, 6)
            r_loop2 = tf.while_loop(condition_loop2, loop2, loop_vars=(_i, _j, BatchSizeIndex, col_num, _Indices))  
            return tf.add(_i,1), BatchSizeIndex, r_loop2[3], r_loop2[4]

        _Indices = tf.zeros((1200,3), dtype=tf.int32)
        col_num = tf.constant(0)
        _i = tf.constant(0)
        condition_loop1 = lambda _i, BatchSizeIndex, col_num, _Indices: tf.less(_i, 200)
        r_loop1 = tf.while_loop(condition_loop1, loop1, loop_vars=(_i, BatchSizeIndex, col_num, _Indices))
        return r_loop1[-1]
    
    def call(self, images, training):
        batch_size = tf.shape(images)[0]
        new_data = tf.zeros((batch_size, 200, 6), dtype=tf.float64)
        images = tf.cast(images, dtype=tf.float64)
        if training:
            data = self.execute(images, batch_size, new_data, tf.constant(True))
        else:
            data = self.execute(images, batch_size, new_data, tf.constant(False))
        
        return data

最终的代码可以修改为这样执行。

def augmentation(data):
    .....
    return data

if __name__ == '__main__':
    trainDS = tf.data.Dataset.from_tensor_slices((np.random.rand(3000,2000,6),
                                                  np.concatenate((np.zeros((1500)),np.ones((1500))))))
    trainDS = (
        trainDS
        .cache()
        .shuffle(1000, reshuffle_each_iteration=False)
        .batch(128, drop_remainder=True)
        .prefetch(tf.data.AUTOTUNE))
    
    input = Input((2000,6))
    x = CustomLayer()(input)
    x = LSTM(64, return_sequences=True)(x)
    output = Dense(1,activation='sigmoid')(x)
    model = Model(input, output)
    model.compile(optimizer='adam', loss='BinaryCrossentropy')
    model.fit(trainDS, epochs=3)

结果:独自 tf.data 花费大约 18stf.data+CustomLayer 花费大约 38s

我想澄清的是maptf.data到运行的扩充是在CPU上使用的,但是如果我在CPU中写扩充Layer,理论上应该运行就GPU。为什么两者差距这么大?

环境:python3.6,tensorflow2.4.0

我减少了代码中的tf.gather方法,用tf.TensorArray代替了tf.tensor_scatter_nd_update方法,真正优化了我的训练速度,从2min到2s。