使用 Tensorflow 2.0 在 MNIST 上实现自定义神经网络?

Custom Neural Network Implementation on MNIST using Tensorflow 2.0?

我尝试使用 *TensorFlow 2.0 beta* 在 MNIST 数据集上编写具有两个隐藏层的基本神经网络的自定义实现,但我不确定这里出了什么问题,但我的 训练损失 accuracy 似乎分别停留在 1.585 附近。但是,如果我使用 Keras 构建,我得到的训练损失和准确度非常低,高于 95%,仅需 8-10 时代。

我相信也许我没有更新我的体重之类的?那么我是否需要将我在 backprop 函数中计算的新权重分配回它们各自的 weights/bias 变量?

如果有人能帮助我解决这个问题以及我在下面提到的其他几个问题,我将不胜感激。

再问几个问题:

1) 如何在此自定义实现中添加 DropoutBatch Normalization 层? (使其适用于训练和测试时间)

2) 如何在此代码中使用 回调?即(利用 EarlyStopping 和 ModelCheckpoint 回调)

3) 我下面的代码中是否还有其他任何我可以在此代码中进一步优化的东西,比如可能使用 tensorflow 2.x @tf.function装饰器等)

4) 我还需要提取我获得的最终权重,用于绘制和检查它们的分布。调查梯度消失或爆炸等问题。 (例如:可能是 Tensorboard)

5) 我还需要帮助以更通用的方式编写此代码,以便我可以轻松实现其他网络,如 ConvNets(即 Conv、MaxPool 等)基于这段代码很容易。

这是我的完整代码,便于重现 :

Note: I know I can use high-level API like Keras to build the model much easier but that is not my goal here. Please understand.

import numpy as np
import os
import logging
logging.getLogger('tensorflow').setLevel(logging.ERROR)
import tensorflow as tf
import tensorflow_datasets as tfds

(x_train, y_train), (x_test, y_test) = tfds.load('mnist', split=['train', 'test'], 
                                                  batch_size=-1, as_supervised=True)

# reshaping
x_train = tf.reshape(x_train, shape=(x_train.shape[0], 784))
x_test  = tf.reshape(x_test, shape=(x_test.shape[0], 784))

ds_train = tf.data.Dataset.from_tensor_slices((x_train, y_train))
# rescaling
ds_train = ds_train.map(lambda x, y: (tf.cast(x, tf.float32)/255.0, y))

class Model(object):
    def __init__(self, hidden1_size, hidden2_size, device=None):
        # layer sizes along with input and output
        self.input_size, self.output_size, self.device = 784, 10, device
        self.hidden1_size, self.hidden2_size = hidden1_size, hidden2_size
        self.lr_rate = 1e-03

        # weights initializationg
        self.glorot_init = tf.initializers.glorot_uniform(seed=42)
        # weights b/w input to hidden1 --> 1
        self.w_h1 = tf.Variable(self.glorot_init((self.input_size, self.hidden1_size)))
        # weights b/w hidden1 to hidden2 ---> 2
        self.w_h2 = tf.Variable(self.glorot_init((self.hidden1_size, self.hidden2_size)))
        # weights b/w hidden2 to output ---> 3
        self.w_out = tf.Variable(self.glorot_init((self.hidden2_size, self.output_size)))

        # bias initialization
        self.b1 = tf.Variable(self.glorot_init((self.hidden1_size,)))
        self.b2 = tf.Variable(self.glorot_init((self.hidden2_size,)))
        self.b_out = tf.Variable(self.glorot_init((self.output_size,)))

        self.variables = [self.w_h1, self.b1, self.w_h2, self.b2, self.w_out, self.b_out]


    def feed_forward(self, x):
        if self.device is not None:
            with tf.device('gpu:0' if self.device=='gpu' else 'cpu'):
                # layer1
                self.layer1 = tf.nn.sigmoid(tf.add(tf.matmul(x, self.w_h1), self.b1))
                # layer2
                self.layer2 = tf.nn.sigmoid(tf.add(tf.matmul(self.layer1,
                                                             self.w_h2), self.b2))
                # output layer
                self.output = tf.nn.softmax(tf.add(tf.matmul(self.layer2,
                                                             self.w_out), self.b_out))
        return self.output

    def loss_fn(self, y_pred, y_true):
        self.loss = tf.nn.sparse_softmax_cross_entropy_with_logits(labels=y_true, 
                                                                  logits=y_pred)
        return tf.reduce_mean(self.loss)

    def acc_fn(self, y_pred, y_true):
        y_pred = tf.cast(tf.argmax(y_pred, axis=1), tf.int32)
        y_true = tf.cast(y_true, tf.int32)
        predictions = tf.cast(tf.equal(y_true, y_pred), tf.float32)
        return tf.reduce_mean(predictions)

    def backward_prop(self, batch_xs, batch_ys):
        optimizer = tf.keras.optimizers.Adam(learning_rate=self.lr_rate)
        with tf.GradientTape() as tape:
            predicted = self.feed_forward(batch_xs)
            step_loss = self.loss_fn(predicted, batch_ys)
        grads = tape.gradient(step_loss, self.variables)
        optimizer.apply_gradients(zip(grads, self.variables))

n_shape = x_train.shape[0]
epochs = 20
batch_size = 128

ds_train = ds_train.repeat().shuffle(n_shape).batch(batch_size).prefetch(batch_size)

neural_net = Model(512, 256, 'gpu')

for epoch in range(epochs):
    no_steps = n_shape//batch_size
    avg_loss = 0.
    avg_acc = 0.
    for (batch_xs, batch_ys) in ds_train.take(no_steps):
        preds = neural_net.feed_forward(batch_xs)
        avg_loss += float(neural_net.loss_fn(preds, batch_ys)/no_steps) 
        avg_acc += float(neural_net.acc_fn(preds, batch_ys) /no_steps)
        neural_net.backward_prop(batch_xs, batch_ys)
    print(f'Epoch: {epoch}, Training Loss: {avg_loss}, Training ACC: {avg_acc}')

# output for 10 epochs:
Epoch: 0, Training Loss: 1.7005115111824125, Training ACC: 0.7603832868262543
Epoch: 1, Training Loss: 1.6052448933478445, Training ACC: 0.8524806404020637
Epoch: 2, Training Loss: 1.5905528008006513, Training ACC: 0.8664196092868224
Epoch: 3, Training Loss: 1.584107405738905, Training ACC: 0.8727630912326276
Epoch: 4, Training Loss: 1.5792385798413306, Training ACC: 0.8773203844903037
Epoch: 5, Training Loss: 1.5759121985174716, Training ACC: 0.8804754322627559
Epoch: 6, Training Loss: 1.5739163148682564, Training ACC: 0.8826455712551251
Epoch: 7, Training Loss: 1.5722616605926305, Training ACC: 0.8840812018606812
Epoch: 8, Training Loss: 1.569699136307463, Training ACC: 0.8867688354803249
Epoch: 9, Training Loss: 1.5679460542742163, Training ACC: 0.8885049475356936

Also If there's something I could improve in the code do let me know as well.

拥抱 high-level API 这样的事情。您只需几行代码即可完成,而且更易于调试、阅读和推理:

(x_train, y_train), (x_test, y_test) = tfds.load('mnist', split=['train', 'test'], 
                                                  batch_size=-1, as_supervised=True)

x_train = tf.cast(tf.reshape(x_train, shape=(x_train.shape[0], 784)), tf.float32)
x_test  = tf.cast(tf.reshape(x_test, shape=(x_test.shape[0], 784)), tf.float32)

model = tf.keras.models.Sequential([
  tf.keras.layers.Dense(512, activation='sigmoid'),
  tf.keras.layers.Dense(256, activation='sigmoid'),
  tf.keras.layers.Dense(10, activation='softmax')
])
model.fit(x_train, y_train, epochs=5)
model.evaluate(x_test, y_test)

I tried to write a custom implementation of basic neural network with two hidden layers on MNIST dataset using tensorflow 2.0 beta but I'm not sure what went wrong here but my training loss and accuracy seems to stuck at 1.5 and around85's respectively.

训练部分在哪里? TF 2.0 模型的训练要么是 Keras 的语法,要么是 Eager execution with tf.GradientTape()。你能贴上带有卷积层和密集层的代码吗?你是如何训练它的?


其他问题:

1) How to add a Dropout layer in this custom implementation? i.e (making it work for both train and test time)

您可以添加一个 Dropout() 层:

from tensorflow.keras.layers import Dropout

然后将其插入到 Sequential() 模型中:

Dropout(dprob)     # where dprob = dropout probability

2) How to add Batch Normalization in this code?

和以前一样,有:

from tensorflow.keras.layers import BatchNormalization

选择 将 batchnorm 放入模型中,好吧,这取决于您。没有经验法则,我建议你做实验。对于 ML,它始终是一个反复试验的过程。


3) How can I use callbacks in this code? i.e (making use of EarlyStopping and ModelCheckpoint callbacks)

如果您正在使用 Keras 的语法进行训练,您可以简单地使用它。请检查此 very thorough tutorial 以了解如何使用它。它只需要几行代码。 如果您是 运行 Eager execution 中的模型,则必须使用自己的代码自行实施这些技术。它更复杂,但它也为您提供了更多的实施自由度。


4) Is there anything else in the code that I can optimize further in this code? i.e (making use of tensorflow 2.x @tf.function decorator etc.)

视情况而定。如果您使用的是 Keras 语法,我认为您不需要添加更多内容。如果您在 Eager execution 中训练模型,那么我建议您在某些函数上使用 @tf.function 装饰器来加快速度。 您可以在 this Notebook.

中查看有关如何使用装饰器的实用 TF 2.0 示例

除此之外,我建议您使用正则化技术,例如权重初始化、L1-L2 损失等


5) Also I need a way to extract all my final weights for all layers after training so I can plot them and check their distributions. To check issues like gradient vanishing or exploding.

训练模型后,您可以使用以下方法提取其权重:

weights = model.get_weights()

或:

weights = model.trainable_weights

如果你只想保留可训练的。


6) I also want help in writing this code in a more generalized way so I can easily implement other networks like convolutional network (i.e Conv, MaxPool etc.) based on this code easily.

您可以将所有代码打包到一个函数中,然后。在this Notebook的最后我做了这样的事情(它是为了一个feed-forward NN,它更简单,但这只是一个开始,你可以根据你的需要改变代码)。

---

更新:

请检查我的TensorFlow 2.0 implementaion of a CNN classifier。这可能是一个有用的提示:它是在 Fashion MNIST 数据集上训练的,这使得它与您的任务非常相似。

我想知道从哪里开始你的多重问题,我决定用一个声明来这样做:

您的代码绝对不应该看起来像那样,并且与当前的 Tensorflow 最佳实践相去甚远

不好意思,一步步调试是浪费大家的时间,对我们双方都没有好处。

现在,转到第三点:

  1. Is there anything else in my code below that I can optimize further in this code like maybe making use of tensorflow 2.x @tf.function decorator etc.)

是的,你可以使用 tensorflow2.0 功能,看起来你 运行 远离那些(tf.function 装饰器实际上在这里没有用,暂时离开它).

遵循新指南也可以缓解您的第 5 点问题,即:

  1. I also want help in writing this code in a more generalized way so I can easily implement other networks like ConvNets (i.e Conv, MaxPool etc.) based on this code easily.

因为它是专门为此设计的。稍作介绍后,我将尝试分几步向您介绍这些概念:

1。把你的程序分成逻辑部分

Tensorflow 在代码可读性方面造成了很大的伤害; tf1.x 中的所有内容通常都在一个地方处理,全局变量之后是函数定义,然后是另一个全局变量或者可能是数据加载,所有这些都是一团糟。这并不是开发人员的错,因为系统的设计鼓励了这些行为。

现在,在 tf2.0 中,鼓励程序员按照在 pytorchchainer 和其他更多 user-friendly 框架中看到的结构类似地划分他的工作。

1.1 数据加载

你与 Tensorflow Datasets 的关系很好,但你无缘无故地离开了。

这是您的代码,附有说明:

# You already have tf.data.Dataset objects after load
(x_train, y_train), (x_test, y_test) = tfds.load('mnist', split=['train', 'test'], 
                                                  batch_size=-1, as_supervised=True)

# But you are reshaping them in a strange manner...
x_train = tf.reshape(x_train, shape=(x_train.shape[0], 784))
x_test  = tf.reshape(x_test, shape=(x_test.shape[0], 784))

# And building from slices...
ds_train = tf.data.Dataset.from_tensor_slices((x_train, y_train))
# Unreadable rescaling (there are built-ins for that)

您可以很容易地将这个想法推广到任何数据集,将其放在单独的模块中,比如 datasets.py:

import tensorflow as tf
import tensorflow_datasets as tfds


class ImageDatasetCreator:
    @classmethod
    # More portable and readable than dividing by 255
    def _convert_image_dtype(cls, dataset):
        return dataset.map(
            lambda image, label: (
                tf.image.convert_image_dtype(image, tf.float32),
                label,
            )
        )

    def __init__(self, name: str, batch: int, cache: bool = True, split=None):
        # Load dataset, every dataset has default train, test split
        dataset = tfds.load(name, as_supervised=True, split=split)
        # Convert to float range
        try:
            self.train = ImageDatasetCreator._convert_image_dtype(dataset["train"])
            self.test = ImageDatasetCreator._convert_image_dtype(dataset["test"])
        except KeyError as exception:
            raise ValueError(
                f"Dataset {name} does not have train and test, write your own custom dataset handler."
            ) from exception

        if cache:
            self.train = self.train.cache()  # speed things up considerably
            self.test = self.test.cache()

        self.batch: int = batch

    def get_train(self):
        return self.train.shuffle().batch(self.batch).repeat()

    def get_test(self):
        return self.test.batch(self.batch).repeat()

所以现在您可以使用简单的命令加载超过 mnist

from datasets import ImageDatasetCreator

if __name__ == "__main__":
    dataloader = ImageDatasetCreator("mnist", batch=64, cache = True)
    train, test = dataloader.get_train(), dataloader.get_test()

您可以使用 mnist 以外的任何名称来加载数据集。

求求你了,别把所有深度学习相关的东西都写成hand-off脚本了,你也是程序员.

1.2 模型创建

因为 tf2.0 根据模型的复杂性,有两种建议的方法可以继续:

  • tensorflow.keras.models.Sequential - 这种方式已由 展示,无需重申他的观点。用于最简单的模型(你应该将这个与你的前馈一起使用)。
  • 继承tensorflow.keras.Model并编写自定义模型。当你的模块中有某种逻辑或者它更复杂(比如 ResNets、多路径网络等)时,应该使用这个。总而言之,更具可读性和可定制性。

你的 Model class 试图像那样,但它又南下了; backprop绝对不是模型本身的一部分,lossaccuracy也不是,将它们分离到另一个模块或函数中,defo不是成员!

也就是说,让我们使用第二种方法对网络进行编码(为简洁起见,您应该将此代码放在 model.py 中)。在此之前,我将通过继承 tf.keras.Layers 从头编写 YourDense 前馈层(这个可能会进入 layers.py 模块):

import tensorflow as tf

class YourDense(tf.keras.layers.Layer):
    def __init__(self, units):
        # It's Python 3, you don't have to specify super parents explicitly
        super().__init__()
        self.units = units

    # Use build to create variables, as shape can be inferred from previous layers
    # If you were to create layers in __init__, one would have to provide input_shape
    # (same as it occurs in PyTorch for example)
    def build(self, input_shape):
        # You could use different initializers here as well
        self.kernel = self.add_weight(
            shape=(input_shape[-1], self.units),
            initializer="random_normal",
            trainable=True,
        )
        # You could define bias in __init__ as well as it's not input dependent
        self.bias = self.add_weight(shape=(self.units,), initializer="random_normal")
        # Oh, trainable=True is default

    def call(self, inputs):
        # Use overloaded operators instead of tf.add, better readability
        return tf.matmul(inputs, self.kernel) + self.bias

关于你的

  1. How to add a Dropout and Batch Normalization layer in this custom implementation? (i.e making it work for both train and test time)

我想您想创建这些层的自定义实现。 如果没有,您可以只导入 from tensorflow.keras.layers import Dropout 并像 指出的那样在任何您想要的地方使用它。 在下面的 traintest 期间具有不同行为的反向丢失:

class CustomDropout(layers.Layer):
    def __init__(self, rate, **kwargs):
        super().__init__(**kwargs)
        self.rate = rate

    def call(self, inputs, training=None):
        if training:
            # You could simply create binary mask and multiply here
            return tf.nn.dropout(inputs, rate=self.rate)
        # You would need to multiply by dropout rate if you were to do that
        return inputs

from here 并进行了修改以更好地适应展示目的。

现在您终于可以创建模型了(简单的双前馈):

import tensorflow as tf

from layers import YourDense


class Model(tf.keras.Model):
    def __init__(self):
        super().__init__()
        # Use Sequential here for readability
        self.network = tf.keras.Sequential(
            [YourDense(100), tf.keras.layers.ReLU(), YourDense(10)]
        )

    def call(self, inputs):
        # You can use non-parametric layers inside call as well
        flattened = tf.keras.layers.Flatten()(inputs)
        return self.network(flattened)

Ofc,你应该在一般实现中尽可能使用built-ins。

这个结构是非常可扩展的,所以泛化到卷积网络、resnets、senets,任何应该通过这个模块完成的事情。您可以阅读更多相关信息 here.

我认为它满足了你的第5点:

  1. I also want help in writing this code in a more generalized way so I can easily implement other networks like ConvNets (i.e Conv, MaxPool etc.) based on this code easily.

最后,您可能必须使用 model.build(shape) 才能构建模型的图形。

model.build((None, 28, 28, 1))

这适用于 MNIST 的 28x28x1 输入形状,其中 None 代表批次。

1.3 培训

再一次,可以通过两种不同的方式进行训练:

  • 标准 Keras model.fit(dataset) - 在 classification
  • 等简单任务中很有用
  • tf.GradientTape - 更复杂的训练方案,最突出的例子是 Generative Adversarial Networks,其中两个模型优化正交目标玩 minmax 游戏

正如再次指出的那样,如果你要使用第二种方式,你将无法简单地使用Keras提供的回调,因此我建议坚持使用第一种尽可能选择。

理论上,您可以像 on_batch_begin() 和其他需要的地方那样手动调用回调函数,但这会很麻烦,我不确定这将如何工作。

说到第一个选项,可以直接用fit tf.data.Dataset对象。这是它在另一个里面展示的r模块(最好是train.py):

def train(
    model: tf.keras.Model,
    path: str,
    train: tf.data.Dataset,
    epochs: int,
    steps_per_epoch: int,
    validation: tf.data.Dataset,
    steps_per_validation: int,
    stopping_epochs: int,
    optimizer=tf.optimizers.Adam(),
):
    model.compile(
        optimizer=optimizer,
        # I used logits as output from the last layer, hence this
        loss=tf.losses.SparseCategoricalCrossentropy(from_logits=True),
        metrics=[tf.metrics.SparseCategoricalAccuracy()],
    )

    model.fit(
        train,
        epochs=epochs,
        steps_per_epoch=steps_per_epoch,
        validation_data=validation,
        validation_steps=steps_per_validation,
        callbacks=[
            # Tensorboard logging
            tf.keras.callbacks.TensorBoard(
                pathlib.Path("logs")
                / pathlib.Path(datetime.datetime.now().strftime("%Y%m%d-%H%M%S")),
                histogram_freq=1,
            ),
            # Early stopping with best weights preserving
            tf.keras.callbacks.EarlyStopping(
                monitor="val_sparse_categorical_accuracy",
                patience=stopping_epochs,
                restore_best_weights=True,
            ),
        ],
    )
    model.save(path)

更复杂的方法与 PyTorch 训练循环非常相似(几乎是复制和粘贴),所以如果您熟悉这些,它们应该不会造成太大问题。

您可以在整个 tf2.0 文档中找到示例,例如here 这里 .

2。其他

2.1 未回答的问题

  1. Is there anything else in the code that I can optimize further in this code? i.e (making use of tensorflow 2.x @tf.function decorator etc.)

上面已经将模型转换为图形,因此我认为在这种情况下调用它不会给您带来好处。过早的优化是万恶之源,记得在做这件事之前衡量你的代码。

通过适当的数据缓存(如#1.1 开头所述)和良好的管道,您可以获得更多。

  1. Also I need a way to extract all my final weights for all layers after training so I can plot them and check their distributions. To check issues like gradient vanishing or exploding.

正如上面所指出的,

weights = model.get_weights()

会给你重量。您可以将它们转换为 np.array 并使用 seabornmatplotlib、分析、检查或您想要的任何其他方式进行绘图。

2.2 放在一起

总而言之,您的main.py(或入口点或类似的东西)将包括(或多或少):

from dataset import ImageDatasetCreator
from model import Model
from train import train

# You could use argparse for things like batch, epochs etc.
if __name__ == "__main__":
    dataloader = ImageDatasetCreator("mnist", batch=64, cache=True)
    train, test = dataloader.get_train(), dataloader.get_test()
    model = Model()
    model.build((None, 28, 28, 1))
    train(
        model, train, path epochs, test, len(train) // batch, len(test) // batch, ...
    )  # provide necessary arguments appropriately
    # Do whatever you want with those
    weights = model.get_weights()

哦,记住上面的功能不是用来复制粘贴的,应该更像是一个指南。如果您有任何问题,请联系我。

3。来自评论的问题

3.1 如何初始化自定义层和 built-in 层

3.1.1 TLDR 您将要阅读的内容

  • 自定义泊松初始化函数,但是需要三个 参数
  • tf.keras.initalization API 需要 两个 个参数(见最后一点 in their docs),因此一个是 通过 Python 的 lambda 在我们之前编写的自定义层中指定
  • 添加了图层的可选偏差,可以使用关闭 布尔值

为什么这么复杂? 为了表明在 tf2.0 中您最终可以使用 Python 的功能,不再有图形麻烦,if 而不是 tf.cond 等.

3.1.2 从 TLDR 到实施

可以找到 Keras 初始化程序 here and Tensorflow's flavor here

请注意 API 不一致(大写字母如 classes,带下划线的小写字母如函数),尤其是在 tf2.0 中,但这不是重点。

您可以通过传递字符串(如上面 YourDense 中所做的那样)或在创建对象期间使用它们。

要允许在您的自定义图层中进行自定义初始化,您只需向构造函数添加额外的参数(tf.keras.Model class 仍然是 Python class 并且它是 __init__ 应该和 Python 一样使用。

在此之前,我将向您展示如何创建自定义初始化:

# Poisson custom initialization because why not.
def my_dumb_init(shape, lam, dtype=None):
    return tf.squeeze(tf.random.poisson(shape, lam, dtype=dtype))

注意,它的签名需要三个参数,而它应该只需要 (shape, dtype)。不过,人们可以在创建自己的图层时轻松地“修复”这个问题,如下图所示(扩展 YourLinear):

import typing

import tensorflow as tf


class YourDense(tf.keras.layers.Layer):
    # It's still Python, use it as Python, that's the point of tf.2.0
    @classmethod
    def register_initialization(cls, initializer):
        # Set defaults if init not provided by user
        if initializer is None:
            # let's make the signature proper for init in tf.keras
            return lambda shape, dtype: my_dumb_init(shape, 1, dtype)
        return initializer

    def __init__(
        self,
        units: int,
        bias: bool = True,
        # can be string or callable, some typing info added as well...
        kernel_initializer: typing.Union[str, typing.Callable] = None,
        bias_initializer: typing.Union[str, typing.Callable] = None,
    ):
        super().__init__()
        self.units: int = units
        self.kernel_initializer = YourDense.register_initialization(kernel_initializer)
        if bias:
            self.bias_initializer = YourDense.register_initialization(bias_initializer)
        else:
            self.bias_initializer = None

    def build(self, input_shape):
        # Simply pass your init here
        self.kernel = self.add_weight(
            shape=(input_shape[-1], self.units),
            initializer=self.kernel_initializer,
            trainable=True,
        )
        if self.bias_initializer is not None:
            self.bias = self.add_weight(
                shape=(self.units,), initializer=self.bias_initializer
            )
        else:
            self.bias = None

    def call(self, inputs):
        weights = tf.matmul(inputs, self.kernel)
        if self.bias is not None:
            return weights + self.bias

我已经添加了 my_dumb_initialization 作为默认值(如果用户没有提供)并使用 bias 参数将偏差设为可选。请注意,只要不依赖于数据,您就可以自由使用 if。如果它是(或以某种方式依赖于 tf.Tensor),则必须使用 @tf.function 装饰器,它将 Python 的流程更改为它的 tensorflow 对应(例如 iftf.cond).

亲笔签名见here,很容易跟上

如果您想将上述初始化程序更改合并到您的模型中,您必须创建适当的对象,仅此而已。

... # Previous of code Model here
self.network = tf.keras.Sequential(
    [
        YourDense(100, bias=False, kernel_initializer="lecun_uniform"),
        tf.keras.layers.ReLU(),
        YourDense(10, bias_initializer=tf.initializers.Ones()),
    ]
)
... # and the same afterwards

使用 built-in tf.keras.layers.Dense 层,可以做同样的事情(参数名称不同,但想法成立)。

3.2 使用tf.GradientTape

自动微分

3.2.1 介绍

tf.GradientTape的要点是允许用户正常Python控制流和变量相对于另一个变量的梯度计算。

取自 here 的示例,但分成了几个单独的部分:

def f(x, y):
  output = 1.0
  for i in range(y):
    if i > 1 and i < 5:
      output = tf.multiply(output, x)
  return output

带有 forif 流控制语句的常规 python 函数

def grad(x, y):
  with tf.GradientTape() as t:
    t.watch(x)
    out = f(x, y)
  return t.gradient(out, x)

使用梯度磁带,您可以记录对 Tensors 的所有操作(以及它们的中间状态)并向后“播放”它(使用链规则执行自动向后微分)。

tf.GradientTape() 上下文管理器中的每个 Tensor 都会自动记录。如果某些 Tensor 超出范围,请使用 watch() 方法,如上所示。

最后,output相对于x的梯度(返回输入)。

3.2.2 与深度学习的联系

上面介绍的是backpropagation算法。为网络中的每个节点(或者更确切地说,为每一层)计算梯度 w.r.t(关于)输出。然后各种优化器使用这些梯度进行修正,如此重复。

让我们继续,假设您已经设置了 tf.keras.Model、优化器实例、tf.data.Dataset 和损失函数。

可以定义一个 Trainer class 来为我们进行训练。 请仔细阅读如有疑问在代码中评论:

class Trainer:
    def __init__(self, model, optimizer, loss_function):
        self.model = model
        self.loss_function = loss_function
        self.optimizer = optimizer
        # You could pass custom metrics in constructor
        # and adjust train_step and test_step accordingly
        self.train_loss = tf.keras.metrics.Mean(name="train_loss")
        self.test_loss = tf.keras.metrics.Mean(name="train_loss")

    def train_step(self, x, y):
        # Setup tape
        with tf.GradientTape() as tape:
            # Get current predictions of network
            y_pred = self.model(x)
            # Calculate loss generated by predictions
            loss = self.loss_function(y, y_pred)
        # Get gradients of loss w.r.t. EVERY trainable variable (iterable returned)
        gradients = tape.gradient(loss, self.model.trainable_variables)
        # Change trainable variable values according to gradient by applying optimizer policy
        self.optimizer.apply_gradients(zip(gradients, self.model.trainable_variables))
        # Record loss of current step
        self.train_loss(loss)

    def train(self, dataset):
        # For N epochs iterate over dataset and perform train steps each time
        for x, y in dataset:
            self.train_step(x, y)

    def test_step(self, x, y):
        # Record test loss separately
        self.test_loss(self.loss_function(y, self.model(x)))

    def test(self, dataset):
        # Iterate over whole dataset
        for x, y in dataset:
            self.test_step(x, y)

    def __str__(self):
        # You need Python 3.7 with f-string support
        # Just return metrics
        return f"Loss: {self.train_loss.result()}, Test Loss: {self.test_loss.result()}"

现在,您可以像这样在代码中使用 class:

EPOCHS = 5

# model, optimizer, loss defined beforehand
trainer = Trainer(model, optimizer, loss)
for _ in range(EPOCHS):
    trainer.train(train_dataset) # Same for training and test datasets
    trainer.test(test_dataset)
    print(f"Epoch {epoch}: {trainer})")

Print 会告诉您每个时期的训练和测试损失。您可以以任何方式混合训练和测试(例如,5 个训练周期和 1 个测试周期),您可以添加不同的指标等。

如果你想要面向 non-OOP 的方法,请参阅 here(IMO 可读性较差,但对每个人来说都是自己的)。