Theano 使用带有小批量的分类交叉熵(负对数似然)

Theano using categorical crossentropy (negative log-likelihood) with minibatches

在分类网络上使用小批量时,我正在尝试使用 T.nnet.categorical_crossentropy 作为成本函数。我无法将输入标签(例如 Y 数据)传递给成本函数。

看一看:

"""
batch_size = 10
n_classes = 26
input_labels.shape = (10, 26)
"""
err_func = T.nnet.categorical_crossentropy

cost = err_func(output_layer, input_labels)
grads = T.grad(cost, params)

TypeError: cost must be a scalar.


cost = err_func(output_layer, np.array([T.arange(input_labels.shape[0]), input_labels]))
theano.tensor.var.AsTensorError: ("Cannot convert [ARange{dtype='int64'}.0 Y] to TensorType", <class 'numpy.ndarray'>)

cost = err_func(output_layer, theano.shared(np.array([T.arange(input_labels.shape[0]), input_labels])))
AttributeError: 'SharedVariable' object has no attribute 'ndim'

cost = err_func(output_layer, np.array([np.arange(input_labels.shape[0]), input_labels]))
TypeError: a float is required

大部分其他代码,如果需要的话:

import theano
from theano import tensor as T
# from theano.tensor.signal.downsample import max_pool_2d  # deprecated
from theano.tensor.signal.pool import pool_2d
import numpy as np
import logging
from matplotlib import pyplot as plt
from net.net_utils import init_rand_weights, init_zero_weights


class ConvNet:
    def __init__(self, layers, err_func, backprop_func, backprop_params,
                 l_rate, batch_size=10):
        """
        :param layers:
        :param err_func: cost/error function
        :param backprop_func: backpropagation function
        :param backprop_params: parameters to pass to backprop function
        :param l_rate: learning rate
        :param batch_size: (mini-) batch size. In comparison to regular nets
        :return:
        """
        self.batch_size = batch_size

        logging.info('\tConstructing ANN with %s layers. Learning rate: %s. Batch size: %s ',
                     len(layers), l_rate, batch_size)
        params = []  # Regular weights and bias weights; e.g. everything to be adjusted during training
        for layer in layers:
            for param in layer.params:
                params.append(param)
        logging.info('\tNumber of parameters to train: %s',
                     sum(param.get_value(borrow=True, return_internal_type=True).size for param in params))

        input_data = T.fmatrix('X')
        input_labels = T.fmatrix('Y')

        layers[0].activate(input_data, batch_size)
        for i in range(1, len(layers)):
            prev_layer = layers[i-1]
            current_layer = layers[i]
            current_layer.activate(prev_layer.output(), batch_size)

        output_layer = layers[-1].output_values

        #cost = err_func(output_layer, input_labels)
        cost = err_func(output_layer, theano.shared(np.array([T.arange(input_labels.shape[0]), input_labels])))

        updates = backprop_func(cost, params, l_rate, **backprop_params)

        prediction = T.argmax(output_layer, axis=1)
        prediction_value = T.max(output_layer, axis=1)

        logging.debug('\tConstructing functions ...')
        self.trainer = theano.function(
            inputs=[input_data, input_labels],
            outputs=cost,
            updates=updates,
            name='Trainer',
            allow_input_downcast=True  # Allows float64 to be casted as float32, which is necessary in order to use GPU
        )
        self.predictor = theano.function(
            inputs=[input_data],
            outputs={'char_as_int': prediction,
                     'char_probability': prediction_value,
                     'output_layer': output_layer},
            name='Predictor',
            allow_input_downcast=True
        )

    def train_and_test(self, train_x, train_y, test_x, test_y, epochs=200, plot=True):
        assert len(train_x) == len(train_y) and len(test_x) == len(test_y), \
            ("Training", len(train_x), len(train_y), "or testing", len(test_x), len(test_y),
             "data sets does not have the same amount of data as classifications")
        logging.info('\tTraining and testing for %s epochs ...', epochs)

        """
        Both training and testing is done in batches for increased speed and to avoid running out of memory.
        If len(train_x) % self.batch size != 0 then some training examples will not be trained on. The same applies to
        testing.
        """
        n_training_batches = len(train_x) // self.batch_size
        n_testing_batches = len(test_x) // self.batch_size
        train_success_rates = []
        test_success_rates = []
        for i in range(epochs):
            for j in range(n_training_batches):
                x_cases = train_x[j*self.batch_size:(j+1)*self.batch_size]
                y_cases = train_y[j*self.batch_size:(j+1)*self.batch_size]
                self.trainer(x_cases, y_cases)

            # Get success rate on training and test data set
            tr_result = np.zeros(shape=(n_training_batches*self.batch_size))
            te_result = np.zeros(shape=(n_testing_batches*self.batch_size))
            for k in range(n_training_batches):
                batch = train_x[k*self.batch_size:(k+1)*self.batch_size]
                tr_result[k*self.batch_size:(k+1)*self.batch_size] = self.predictor(batch)['char_as_int']
            for l in range(n_testing_batches):
                batch = test_x[l*self.batch_size:(l+1)*self.batch_size]
                te_result[l*self.batch_size:(l+1)*self.batch_size] = self.predictor(batch)['char_as_int']
                # logging.debug('\t\t\t\t L:%s:%s / %s, batch size %s', l, l+self.batch_size, len(test_x), len(batch))
            # todo: verify that the length of each comparison result set is equal,
            # and that the sets are equally 'full' (no missing values)
            tr_success_rate = np.mean(np.argmax(train_y[:n_training_batches*self.batch_size], axis=1) == tr_result)
            te_success_rate = np.mean(np.argmax(test_y[:n_testing_batches*self.batch_size], axis=1) == te_result)
            train_success_rates.append(tr_success_rate)
            test_success_rates.append(te_success_rate)

            if i % (epochs / 20) == 0:
                logging.info('\t\tProgress: %s%% | Epoch: %s | Success rate (training, test): %s, %s',
                             (i / epochs)*100, i,
                             "{:.4f}".format(max(train_success_rates)), "{:.4f}".format(max(test_success_rates)))

        logging.info('\tMax success rate (training | test): %s | %s',
                     "{:.4f}".format(max(train_success_rates)), "{:.4f}".format(max(test_success_rates)))
        if plot:
            plt.title('Convolutional Pooled Net')
            plt.plot(train_success_rates)
            plt.plot(test_success_rates)
            plt.legend(['Train', 'Test'], loc="best")
            plt.grid(True)
            plt.yticks(np.arange(0, 1, 0.05))
            plt.show()

    def predict(self, input_x):
        return self.predictor(input_x)


class FullyConnectedLayer:
    def __init__(self, n_in, n_out, act_func):
        """
        Generate a fully connected layer with 1 bias node simulated upstream
        :param act_func: the activation function of the layer
        """
        self.n_in = n_in
        self.n_out = n_out
        self.act_func = act_func
        self.weights = init_rand_weights((n_in, n_out), "w")
        self.bias_weights = init_rand_weights((n_out,), "b")
        self.params = [self.weights, self.bias_weights]
        self.output_values = None

    def activate(self, input_values, batch_size):
        """
        :param input_values: the output from the upstream layer (which is input to this layer)
        :param batch_size:
        :return:
        """
        input_values = input_values.reshape((batch_size, self.n_in))
        self.output_values = self.act_func(T.dot(input_values, self.weights) + self.bias_weights)

    def output(self):
        assert self.output_values is not None, 'Asking for output before activating layer'
        return self.output_values


class SoftMaxLayer(FullyConnectedLayer):
    def __init__(self, n_in, n_out):
        super(SoftMaxLayer, self).__init__(n_in, n_out, T.nnet.softmax)
        # todo find out how to initialize softmaxlayer weights as zero, and if it gives better results/faster training


class ConvPoolLayer:
    conv_func = staticmethod(T.nnet.conv2d)
    pool_func = staticmethod(pool_2d)

    def __init__(self, input_shape, n_feature_maps, act_func,
                 local_receptive_field_size=(5, 5), pool_size=(2, 2), pool_mode='max'):
        """
        Generate a convolutional and a subsequent pooling layer with one bias node for each channel in the pooling layer.
        :param input_shape: tuple(batch size, input channels, input rows, input columns) where
            input_channels = number of feature maps in upstream layer
            input rows, input columns = output size of upstream layer
        :param n_feature_maps: number of feature maps/filters in this layer
        :param local_receptive_field_size: (filter rows, filter columns) = size of local receptive field
        :param pool_size: (rows, columns)
        :param act_func: activation function to be applied to the output from the pooling layer
        :param init_weight_func:
        :param init_bias_weight_func:
        """
        self.input_shape = input_shape
        self.n_feature_maps = n_feature_maps
        self.filter_shape = (n_feature_maps, input_shape[1]) + local_receptive_field_size
        self.local_receptive_field_size = local_receptive_field_size
        self.act_func = act_func
        self.pool_size = pool_size
        self.pool_mode = pool_mode
        self.weights = init_rand_weights(self.filter_shape, "conv2poolWeights")
        self.bias_weights = init_rand_weights((n_feature_maps,), "conv2poolBiasWeights")
        self.params = [self.weights, self.bias_weights]
        self.output_values = None

    def activate(self, input_values, *args):
        """
        :param input_values: the output from the upstream layer (which is input to this layer)
        :return:
        """
        input_values = input_values.reshape(self.input_shape)
        conv = self.conv_func(
            input=input_values,
            input_shape=self.input_shape,
            filters=self.weights,
            filter_shape=self.filter_shape
        )
        pooled = self.pool_func(
            input=conv,
            ds=self.pool_size,
            ignore_border=True,  # If the pool size does not evenly divide the input,
                                 # then ignoring the border will pool from padded zeros.
                                 # This is usually the desired behaviour when max pooling.
            # st=(1,1) # Stride size. Defaults to pool size, e.g. non-overlapping pooling regions
            mode=self.pool_mode  # ‘max’, ‘sum’, ‘average_inc_pad’ or ‘average_exc_pad’
        )
        self.output_values = self.act_func(pooled + self.bias_weights.dimshuffle('x', 0, 'x', 'x'))

    def output(self):
        assert self.output_values is not None, 'Asking for output before activating layer'
        return self.output_values

    @property
    def output_shape(self):
        batch_size = self.input_shape[0]
        if self.local_receptive_field_size[0] != self.local_receptive_field_size[1] \
                or self.pool_size[0] != self.pool_size[1]:
            raise NotImplementedError("I don't know how to calculate output shape when the local receptive field,",
                                      self.local_receptive_field_size, ", or the pool,",
                                      self.pool_size, ", is non-square")
        after_conv = self.input_shape[2] - self.local_receptive_field_size[0]
        after_pool = int(np.ceil(after_conv/2.0))


  shape = (batch_size, self.n_feature_maps, after_pool, after_pool)
    return shape

这有效,但我不确定它的最佳程度:

cost = T.mean(T.nnet.categorical_crossentropy(output_layer, input_labels))