我如何在 TensorFlow 中使用批量归一化?

How could I use batch normalization in TensorFlow?

我想在 TensorFlow 中使用 批量归一化。我在core/ops/nn_ops.cc中找到了相关的C++源代码。但是,我没有在 tensorflow.org.

上找到它的记录

BN 在 MLP 和 CNN 中有不同的语义,所以我不确定这个 BN 到底做了什么。

我也没有找到名为MovingMoments的方法。

2016 年 7 月更新 在 TensorFlow 中使用批量规范化的最简单方法是通过 contrib/layers, tflearn, or slim.

中提供的更高级别的接口

想DIY的话上一个回答: 自发布以来,这方面的文档字符串得到了改进 - 请参阅 docs comment in the master branch 而不是您找到的那个。它特别阐明了它是 tf.nn.moments.

的输出

您可以在 batch_norm test code 中看到一个非常简单的使用示例。对于更真实的使用示例,我在帮助程序 class 下方包含了我为自己使用而草草写下的使用说明(不提供保证!):

"""A helper class for managing batch normalization state.                   

This class is designed to simplify adding batch normalization               
(http://arxiv.org/pdf/1502.03167v3.pdf) to your model by                    
managing the state variables associated with it.                            

Important use note:  The function get_assigner() returns                    
an op that must be executed to save the updated state.                      
A suggested way to do this is to make execution of the                      
model optimizer force it, e.g., by:                                         

  update_assignments = tf.group(bn1.get_assigner(),                         
                                bn2.get_assigner())                         
  with tf.control_dependencies([optimizer]):                                
    optimizer = tf.group(update_assignments)                                

"""

import tensorflow as tf


class ConvolutionalBatchNormalizer(object):
  """Helper class that groups the normalization logic and variables.        

  Use:                                                                      
      ewma = tf.train.ExponentialMovingAverage(decay=0.99)                  
      bn = ConvolutionalBatchNormalizer(depth, 0.001, ewma, True)           
      update_assignments = bn.get_assigner()                                
      x = bn.normalize(y, train=training?)                                  
      (the output x will be batch-normalized).                              
  """

  def __init__(self, depth, epsilon, ewma_trainer, scale_after_norm):
    self.mean = tf.Variable(tf.constant(0.0, shape=[depth]),
                            trainable=False)
    self.variance = tf.Variable(tf.constant(1.0, shape=[depth]),
                                trainable=False)
    self.beta = tf.Variable(tf.constant(0.0, shape=[depth]))
    self.gamma = tf.Variable(tf.constant(1.0, shape=[depth]))
    self.ewma_trainer = ewma_trainer
    self.epsilon = epsilon
    self.scale_after_norm = scale_after_norm

  def get_assigner(self):
    """Returns an EWMA apply op that must be invoked after optimization."""
    return self.ewma_trainer.apply([self.mean, self.variance])

  def normalize(self, x, train=True):
    """Returns a batch-normalized version of x."""
    if train:
      mean, variance = tf.nn.moments(x, [0, 1, 2])
      assign_mean = self.mean.assign(mean)
      assign_variance = self.variance.assign(variance)
      with tf.control_dependencies([assign_mean, assign_variance]):
        return tf.nn.batch_norm_with_global_normalization(
            x, mean, variance, self.beta, self.gamma,
            self.epsilon, self.scale_after_norm)
    else:
      mean = self.ewma_trainer.average(self.mean)
      variance = self.ewma_trainer.average(self.variance)
      local_beta = tf.identity(self.beta)
      local_gamma = tf.identity(self.gamma)
      return tf.nn.batch_norm_with_global_normalization(
          x, mean, variance, local_beta, local_gamma,
          self.epsilon, self.scale_after_norm)

请注意,我将其称为 ConvolutionalBatchNormalizer,因为它固定使用 tf.nn.moments 来跨轴 0、1 和 2 求和,而对于非卷积使用,您可能只需要轴 0 .

如果您使用它,我们将不胜感激。

以下对我来说很好用,它不需要在外部调用 EMA-apply。

import numpy as np
import tensorflow as tf
from tensorflow.python import control_flow_ops

def batch_norm(x, n_out, phase_train, scope='bn'):
    """
    Batch normalization on convolutional maps.
    Args:
        x:           Tensor, 4D BHWD input maps
        n_out:       integer, depth of input maps
        phase_train: boolean tf.Varialbe, true indicates training phase
        scope:       string, variable scope
    Return:
        normed:      batch-normalized maps
    """
    with tf.variable_scope(scope):
        beta = tf.Variable(tf.constant(0.0, shape=[n_out]),
                                     name='beta', trainable=True)
        gamma = tf.Variable(tf.constant(1.0, shape=[n_out]),
                                      name='gamma', trainable=True)
        batch_mean, batch_var = tf.nn.moments(x, [0,1,2], name='moments')
        ema = tf.train.ExponentialMovingAverage(decay=0.5)

        def mean_var_with_update():
            ema_apply_op = ema.apply([batch_mean, batch_var])
            with tf.control_dependencies([ema_apply_op]):
                return tf.identity(batch_mean), tf.identity(batch_var)

        mean, var = tf.cond(phase_train,
                            mean_var_with_update,
                            lambda: (ema.average(batch_mean), ema.average(batch_var)))
        normed = tf.nn.batch_normalization(x, mean, var, beta, gamma, 1e-3)
    return normed

示例:

import math

n_in, n_out = 3, 16
ksize = 3
stride = 1
phase_train = tf.placeholder(tf.bool, name='phase_train')
input_image = tf.placeholder(tf.float32, name='input_image')
kernel = tf.Variable(tf.truncated_normal([ksize, ksize, n_in, n_out],
                                   stddev=math.sqrt(2.0/(ksize*ksize*n_out))),
                                   name='kernel')
conv = tf.nn.conv2d(input_image, kernel, [1,stride,stride,1], padding='SAME')
conv_bn = batch_norm(conv, n_out, phase_train)
relu = tf.nn.relu(conv_bn)

with tf.Session() as session:
    session.run(tf.initialize_all_variables())
    for i in range(20):
        test_image = np.random.rand(4,32,32,3)
        sess_outputs = session.run([relu],
          {input_image.name: test_image, phase_train.name: True})

由于最近有人对此进行了编辑,我想澄清一下,这不再是一个问题。

似乎不正确 当 phase_train 设置为 false 时,它​​仍然会更新 ema 均值和方差。这可以通过以下代码片段进行验证。

x = tf.placeholder(tf.float32, [None, 20, 20, 10], name='input')
phase_train = tf.placeholder(tf.bool, name='phase_train')

# generate random noise to pass into batch norm
x_gen = tf.random_normal([50,20,20,10])
pt_false = tf.Variable(tf.constant(True))

#generate a constant variable to pass into batch norm
y = x_gen.eval()

[bn, bn_vars] = batch_norm(x, 10, phase_train)

tf.initialize_all_variables().run()
train_step = lambda: bn.eval({x:x_gen.eval(), phase_train:True})
test_step = lambda: bn.eval({x:y, phase_train:False})
test_step_c = lambda: bn.eval({x:y, phase_train:True})

# Verify that this is different as expected, two different x's have different norms
print(train_step()[0][0][0])
print(train_step()[0][0][0])

# Verify that this is same as expected, same x's (y) have same norm
print(train_step_c()[0][0][0])
print(train_step_c()[0][0][0])

# THIS IS DIFFERENT but should be they same, should only be reading from the ema.
print(test_step()[0][0][0])
print(test_step()[0][0][0])

所以一个使用这个batchnorm的简单例子class:

from bn_class import *

with tf.name_scope('Batch_norm_conv1') as scope:
    ewma = tf.train.ExponentialMovingAverage(decay=0.99)                  
    bn_conv1 = ConvolutionalBatchNormalizer(num_filt_1, 0.001, ewma, True)           
    update_assignments = bn_conv1.get_assigner() 
    a_conv1 = bn_conv1.normalize(a_conv1, train=bn_train) 
    h_conv1 = tf.nn.relu(a_conv1)

还有开发者编码的"official" batch normalization layer。他们没有关于如何使用它的很好的文档,但这里是如何使用它的(根据我的说法):

from tensorflow.contrib.layers.python.layers import batch_norm as batch_norm

def batch_norm_layer(x,train_phase,scope_bn):
    bn_train = batch_norm(x, decay=0.999, center=True, scale=True,
    updates_collections=None,
    is_training=True,
    reuse=None, # is this right?
    trainable=True,
    scope=scope_bn)
    bn_inference = batch_norm(x, decay=0.999, center=True, scale=True,
    updates_collections=None,
    is_training=False,
    reuse=True, # is this right?
    trainable=True,
    scope=scope_bn)
    z = tf.cond(train_phase, lambda: bn_train, lambda: bn_inference)
    return z

要实际使用它,您需要为 train_phase 创建一个占位符,指示您是处于训练阶段还是推理阶段(如 train_phase = tf.placeholder(tf.bool, name='phase_train'))。它的值可以在推理或训练期间用 tf.session 填充,如:

test_error = sess.run(fetches=cross_entropy, feed_dict={x: batch_xtest, y_:batch_ytest, train_phase: False})

或训练期间:

sess.run(fetches=train_step, feed_dict={x: batch_xs, y_:batch_ys, train_phase: True})

根据 github 中的讨论,我很确定这是正确的。


好像还有一个有用的 link:

http://r2rt.com/implementing-batch-normalization-in-tensorflow.html

使用 TensorFlow 内置的 batch_norm 层,下面是加载数据的代码,构建一个具有一个隐藏 ReLU 层和 L2 归一化的网络,并为隐藏层和外层引入批量归一化。这运行良好并且训练良好。仅供参考,此示例主要基于 Udacity 深度学习课程的数据和代码构建。 P.S。是的,它的部分内容在前面的答案中以某种方式进行了讨论,但我决定将所有内容都收集在一个代码片段中,以便您拥有包含批量归一化及其评估的整个网络训练过程的示例

# These are all the modules we'll be using later. Make sure you can import them
# before proceeding further.
from __future__ import print_function
import numpy as np
import tensorflow as tf
from six.moves import cPickle as pickle

pickle_file = '/home/maxkhk/Documents/Udacity/DeepLearningCourse/SourceCode/tensorflow/examples/udacity/notMNIST.pickle'

with open(pickle_file, 'rb') as f:
  save = pickle.load(f)
  train_dataset = save['train_dataset']
  train_labels = save['train_labels']
  valid_dataset = save['valid_dataset']
  valid_labels = save['valid_labels']
  test_dataset = save['test_dataset']
  test_labels = save['test_labels']
  del save  # hint to help gc free up memory
  print('Training set', train_dataset.shape, train_labels.shape)
  print('Validation set', valid_dataset.shape, valid_labels.shape)
  print('Test set', test_dataset.shape, test_labels.shape)

image_size = 28
num_labels = 10

def reformat(dataset, labels):
  dataset = dataset.reshape((-1, image_size * image_size)).astype(np.float32)
  # Map 2 to [0.0, 1.0, 0.0 ...], 3 to [0.0, 0.0, 1.0 ...]
  labels = (np.arange(num_labels) == labels[:,None]).astype(np.float32)
  return dataset, labels
train_dataset, train_labels = reformat(train_dataset, train_labels)
valid_dataset, valid_labels = reformat(valid_dataset, valid_labels)
test_dataset, test_labels = reformat(test_dataset, test_labels)
print('Training set', train_dataset.shape, train_labels.shape)
print('Validation set', valid_dataset.shape, valid_labels.shape)
print('Test set', test_dataset.shape, test_labels.shape)


def accuracy(predictions, labels):
  return (100.0 * np.sum(np.argmax(predictions, 1) == np.argmax(labels, 1))
          / predictions.shape[0])


#for NeuralNetwork model code is below
#We will use SGD for training to save our time. Code is from Assignment 2
#beta is the new parameter - controls level of regularization.
#Feel free to play with it - the best one I found is 0.001
#notice, we introduce L2 for both biases and weights of all layers

batch_size = 128
beta = 0.001

#building tensorflow graph
graph = tf.Graph()
with graph.as_default():
      # Input data. For the training data, we use a placeholder that will be fed
  # at run time with a training minibatch.
  tf_train_dataset = tf.placeholder(tf.float32,
                                    shape=(batch_size, image_size * image_size))
  tf_train_labels = tf.placeholder(tf.float32, shape=(batch_size, num_labels))
  tf_valid_dataset = tf.constant(valid_dataset)
  tf_test_dataset = tf.constant(test_dataset)

  #introduce batchnorm
  tf_train_dataset_bn = tf.contrib.layers.batch_norm(tf_train_dataset)


  #now let's build our new hidden layer
  #that's how many hidden neurons we want
  num_hidden_neurons = 1024
  #its weights
  hidden_weights = tf.Variable(
    tf.truncated_normal([image_size * image_size, num_hidden_neurons]))
  hidden_biases = tf.Variable(tf.zeros([num_hidden_neurons]))

  #now the layer itself. It multiplies data by weights, adds biases
  #and takes ReLU over result
  hidden_layer = tf.nn.relu(tf.matmul(tf_train_dataset_bn, hidden_weights) + hidden_biases)

  #adding the batch normalization layerhi()
  hidden_layer_bn = tf.contrib.layers.batch_norm(hidden_layer)

  #time to go for output linear layer
  #out weights connect hidden neurons to output labels
  #biases are added to output labels  
  out_weights = tf.Variable(
    tf.truncated_normal([num_hidden_neurons, num_labels]))  

  out_biases = tf.Variable(tf.zeros([num_labels]))  

  #compute output  
  out_layer = tf.matmul(hidden_layer_bn,out_weights) + out_biases
  #our real output is a softmax of prior result
  #and we also compute its cross-entropy to get our loss
  #Notice - we introduce our L2 here
  loss = (tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(
    out_layer, tf_train_labels) +
    beta*tf.nn.l2_loss(hidden_weights) +
    beta*tf.nn.l2_loss(hidden_biases) +
    beta*tf.nn.l2_loss(out_weights) +
    beta*tf.nn.l2_loss(out_biases)))

  #now we just minimize this loss to actually train the network
  optimizer = tf.train.GradientDescentOptimizer(0.5).minimize(loss)

  #nice, now let's calculate the predictions on each dataset for evaluating the
  #performance so far
  # Predictions for the training, validation, and test data.
  train_prediction = tf.nn.softmax(out_layer)
  valid_relu = tf.nn.relu(  tf.matmul(tf_valid_dataset, hidden_weights) + hidden_biases)
  valid_prediction = tf.nn.softmax( tf.matmul(valid_relu, out_weights) + out_biases) 

  test_relu = tf.nn.relu( tf.matmul( tf_test_dataset, hidden_weights) + hidden_biases)
  test_prediction = tf.nn.softmax(tf.matmul(test_relu, out_weights) + out_biases)



#now is the actual training on the ANN we built
#we will run it for some number of steps and evaluate the progress after 
#every 500 steps

#number of steps we will train our ANN
num_steps = 3001

#actual training
with tf.Session(graph=graph) as session:
  tf.initialize_all_variables().run()
  print("Initialized")
  for step in range(num_steps):
    # Pick an offset within the training data, which has been randomized.
    # Note: we could use better randomization across epochs.
    offset = (step * batch_size) % (train_labels.shape[0] - batch_size)
    # Generate a minibatch.
    batch_data = train_dataset[offset:(offset + batch_size), :]
    batch_labels = train_labels[offset:(offset + batch_size), :]
    # Prepare a dictionary telling the session where to feed the minibatch.
    # The key of the dictionary is the placeholder node of the graph to be fed,
    # and the value is the numpy array to feed to it.
    feed_dict = {tf_train_dataset : batch_data, tf_train_labels : batch_labels}
    _, l, predictions = session.run(
      [optimizer, loss, train_prediction], feed_dict=feed_dict)
    if (step % 500 == 0):
      print("Minibatch loss at step %d: %f" % (step, l))
      print("Minibatch accuracy: %.1f%%" % accuracy(predictions, batch_labels))
      print("Validation accuracy: %.1f%%" % accuracy(
        valid_prediction.eval(), valid_labels))
      print("Test accuracy: %.1f%%" % accuracy(test_prediction.eval(), test_labels))

您可以简单地使用内置 batch_norm 图层:

batch_norm = tf.cond(is_train, 
    lambda: tf.contrib.layers.batch_norm(prev, activation_fn=tf.nn.relu, is_training=True, reuse=None),
    lambda: tf.contrib.layers.batch_norm(prev, activation_fn =tf.nn.relu, is_training=False, reuse=True))

其中 prev 是上一层的输出(可以是全连接层或卷积层),is_train 是布尔占位符。就用batch_norm作为下一层的输入,然后。

从 TensorFlow 1.0(2017 年 2 月)开始,TensorFlow 本身还包含高级 tf.layers.batch_normalization API。

使用起来超级简单:

# Set this to True for training and False for testing
training = tf.placeholder(tf.bool)

x = tf.layers.dense(input_x, units=100)
x = tf.layers.batch_normalization(x, training=training)
x = tf.nn.relu(x)

...除了它会向图形添加额外的操作(用于更新其均值和方差变量),这样它们就不会依赖于您的训练操作。您可以单独 运行 操作:

extra_update_ops = tf.get_collection(tf.GraphKeys.UPDATE_OPS)
sess.run([train_op, extra_update_ops], ...)

或手动将更新操作添加为训练操作的依赖项,然后 运行 您的训练操作照常进行:

extra_update_ops = tf.get_collection(tf.GraphKeys.UPDATE_OPS)
with tf.control_dependencies(extra_update_ops):
    train_op = optimizer.minimize(loss)
...
sess.run([train_op], ...)