胶囊网络的准确性差 - 实施错误?

poor accuracy of Capsule Network - mistake in the implementation?

我正在研究应该可定制的胶囊网络实现。我找到了一个非常简单的代码 (https://towardsdatascience.com/implementing-capsule-network-in-tensorflow-11e4cca5ecae)。我使用了代码并将其更改为我的需要。

但是,我的代码在测试数据集 (MNIST) 上的准确度与其他实现和论文“胶囊之间的动态路由”所建议的准确度不同。胶囊网络的实现是否存在可能的错误?该代码使用 tf subclassing 创建 CapsNet 模型。 这是模型的 class:

import tensorflow as tf
import numpy as np

class CapsuleNetwork(tf.keras.Model):
    def __init__(self, input_dimension, no_channels, no_of_convs, kernel_sizes, conv_strides, no_of_conv_kernels, no_of_primary_capsules, primary_capsule_vector, no_of_secondary_capsules, secondary_capsule_vector, r):
    super(CapsuleNetwork, self).__init__()
    self.input_dimension = input_dimension
    self.no_channels = no_channels
    self.no_of_convs = no_of_convs
    self.kernel_sizes = kernel_sizes
    self.conv_strides = conv_strides
    self.no_of_conv_kernels = no_of_conv_kernels
    self.no_of_primary_capsules = no_of_primary_capsules
    self.primary_capsule_vector = primary_capsule_vector
    self.no_of_secondary_capsules = no_of_secondary_capsules
    self.secondary_capsule_vector = secondary_capsule_vector
    self.r = r
    self.epsilon = 10e-7
    
    # calculating the number of primary capsules based on input image size and kernels/strides of the convolution (1152 in the paper)
    for i in range(self.no_of_convs):
        if i == 0:
            self.feature_map_dim = int(((input_dimension-self.kernel_sizes[i])/self.conv_strides[i])+1)
        else:
            self.feature_map_dim = int(((self.feature_map_dim-self.kernel_sizes[i])/self.conv_strides[i])+1)
            
    self.feature_map_dim = int(((self.feature_map_dim-self.kernel_sizes[-1])/self.conv_strides[-1])+1)
    self.no_primary_caps = int(self.feature_map_dim*self.feature_map_dim*self.no_of_primary_capsules)
    
    with tf.name_scope("Variables") as scope:
        self.conv_dict = {}
        for i in range(no_of_convs):
            self.conv_dict[f"convolution{i}"] = tf.keras.layers.Conv2D(self.no_of_conv_kernels, [self.kernel_sizes[i],self.kernel_sizes[i]], strides=[self.conv_strides[i],self.conv_strides[i]], name=f'ConvolutionLayer_{i}', activation='relu')
        self.primary_capsule = tf.keras.layers.Conv2D(self.no_of_primary_capsules * self.primary_capsule_vector, [self.kernel_sizes[-1],self.kernel_sizes[-1]], strides=[self.conv_strides[-1],self.conv_strides[-1]], name="PrimaryCapsule")
        self.w = tf.Variable(tf.random_normal_initializer()(shape=[1, self.no_primary_caps, self.no_of_secondary_capsules, self.secondary_capsule_vector, self.primary_capsule_vector]), dtype=tf.float32, name="PoseEstimation", trainable=True)
        self.dense_1 = tf.keras.layers.Dense(units = 512, activation='relu')
        self.dense_2 = tf.keras.layers.Dense(units = 1024, activation='relu')
        self.dense_3 = tf.keras.layers.Dense(units = input_dimension*input_dimension*no_channels, activation='sigmoid', dtype='float32')
        
def print_architecture(self):
    print("Model Summary:\n")
    print("Input images: " + str(self.input_dimension) + "x" + str(self.input_dimension) + "x" + str(self.no_channels))
    print(str(self.no_of_convs) + " conv layers")
    print(str(self.no_of_primary_capsules) + " primary capsule channels with " + str(self.no_primary_caps) + " " + str(self.primary_capsule_vector) + "-D capsules")
    print(str(self.no_of_secondary_capsules) + " " + str(self.secondary_capsule_vector) + "-D digit capsules")
    
def build(self, input_shape):
    pass
    
def squash(self, s):
    with tf.name_scope("SquashFunction") as scope:
        s_norm = tf.norm(s, axis=-1, keepdims=True)
        return tf.square(s_norm)/(1 + tf.square(s_norm)) * s/(s_norm + self.epsilon)

@tf.function
def call(self, inputs):
    input_x, y = inputs
    
    for i in range(self.no_of_convs):
        convolution = self.conv_dict.get(f"convolution{i}")
        if i == 0:
            x = convolution(input_x) # x.shape: (None, 20, 20, 256) JB:stimmt nun nicht mehr zwangsläufig
        else:
            x = convolution(x)
    x = self.primary_capsule(x) # x.shape: (None, 6, 6, 256)
    
    with tf.name_scope("CapsuleFormation") as scope:
        u = tf.reshape(x, (-1, self.no_of_primary_capsules * x.shape[1] * x.shape[2], self.primary_capsule_vector)) # u.shape: (None, 1152, 8)
        u = tf.expand_dims(u, axis=-2) # u.shape: (None, 1152, 1, 8)
        u = tf.expand_dims(u, axis=-1) # u.shape: (None, 1152, 1, 8, 1)
        u_hat = tf.matmul(self.w, u) # u_hat.shape: (None, 1152, 10, 16, 1)
        u_hat = tf.squeeze(u_hat, [4]) # u_hat.shape: (None, 1152, 10, 16)

    
    with tf.name_scope("DynamicRouting") as scope:
        b = tf.zeros((input_x.shape[0], self.no_primary_caps, self.no_of_secondary_capsules, 1)) # b.shape: (None, 1152, 10, 1)
        for i in range(self.r): # self.r = 3
            c = tf.nn.softmax(b, axis=-2) # c.shape: (None, 1152, 10, 1)
            s = tf.reduce_sum(tf.multiply(c, u_hat), axis=1, keepdims=True) # s.shape: (None, 1, 10, 16)
            v = self.squash(s) # v.shape: (None, 1, 10, 16)
            agreement = tf.squeeze(tf.matmul(tf.expand_dims(u_hat, axis=-1), tf.expand_dims(v, axis=-1), transpose_a=True), [4]) # agreement.shape: (None, 1152, 10, 1)
            # Before matmul following intermediate shapes are present, they are not assigned to a variable but just for understanding the code.
            # u_hat.shape (Intermediate shape) : (None, 1152, 10, 16, 1)
            # v.shape (Intermediate shape): (None, 1, 10, 16, 1)
            # Since the first parameter of matmul is to be transposed its shape becomes:(None, 1152, 10, 1, 16)
            # Now matmul is performed in the last two dimensions, and others are broadcasted
            # Before squeezing we have an intermediate shape of (None, 1152, 10, 1, 1)
            b += agreement
            
    with tf.name_scope("Masking") as scope:
        y = tf.expand_dims(y, axis=-1) # y.shape: (None, 10, 1)
        y = tf.expand_dims(y, axis=1) # y.shape: (None, 1, 10, 1)
        mask = tf.cast(y, dtype=tf.float32) # mask.shape: (None, 1, 10, 1)
        v_masked = tf.multiply(mask, v) # v_masked.shape: (None, 1, 10, 16)
        
    with tf.name_scope("Reconstruction") as scope:
        v_ = tf.reshape(v_masked, [-1, self.no_of_secondary_capsules * self.secondary_capsule_vector]) # v_.shape: (None, 160)
        reconstructed_image = self.dense_1(v_) # reconstructed_image.shape: (None, 512)
        reconstructed_image = self.dense_2(reconstructed_image) # reconstructed_image.shape: (None, 1024)
        reconstructed_image = self.dense_3(reconstructed_image) # reconstructed_image.shape: (None, 784)
    return v, reconstructed_image

@tf.function
def predict_capsule_output(self, inputs):
    for i in range(self.no_of_convs):
        convolution = self.conv_dict.get(f"convolution{i}")
        if i == 0:
            x = convolution(inputs) # x.shape: (None, 20, 20, 256) JB:stimmt nun nicht mehr zwangsläufig
        else:
            x = convolution(x)
    x = self.primary_capsule(x) # x.shape: (None, 6, 6, 256)
    
    with tf.name_scope("CapsuleFormation") as scope:
        u = tf.reshape(x, (-1, self.no_of_primary_capsules * x.shape[1] * x.shape[2], self.primary_capsule_vector)) # u.shape: (None, 1152, 8)
        u = tf.expand_dims(u, axis=-2) # u.shape: (None, 1152, 1, 8)
        u = tf.expand_dims(u, axis=-1) # u.shape: (None, 1152, 1, 8, 1)
        u_hat = tf.matmul(self.w, u) # u_hat.shape: (None, 1152, 10, 16, 1)
        u_hat = tf.squeeze(u_hat, [4]) # u_hat.shape: (None, 1152, 10, 16)

    
    with tf.name_scope("DynamicRouting") as scope:
        b = tf.zeros((inputs.shape[0], self.no_primary_caps, self.no_of_secondary_capsules, 1)) # b.shape: (None, 1152, 10, 1)
        for i in range(self.r): # self.r = 3
            c = tf.nn.softmax(b, axis=-2) # c.shape: (None, 1152, 10, 1)
            s = tf.reduce_sum(tf.multiply(c, u_hat), axis=1, keepdims=True) # s.shape: (None, 1, 10, 16)
            v = self.squash(s) # v.shape: (None, 1, 10, 16)
            agreement = tf.squeeze(tf.matmul(tf.expand_dims(u_hat, axis=-1), tf.expand_dims(v, axis=-1), transpose_a=True), [4]) # agreement.shape: (None, 1152, 10, 1)
            # Before matmul following intermediate shapes are present, they are not assigned to a variable but just for understanding the code.
            # u_hat.shape (Intermediate shape) : (None, 1152, 10, 16, 1)
            # v.shape (Intermediate shape): (None, 1, 10, 16, 1)
            # Since the first parameter of matmul is to be transposed its shape becomes:(None, 1152, 10, 1, 16)
            # Now matmul is performed in the last two dimensions, and others are broadcasted
            # Before squeezing we have an intermediate shape of (None, 1152, 10, 1, 1)
            b += agreement
    return v

@tf.function
def regenerate_image(self, inputs):
    v, pred_class = inputs
    with tf.name_scope("Reconstruction") as scope:
        v_ = tf.reshape(v, [-1, self.no_of_secondary_capsules * self.secondary_capsule_vector]) # v_.shape: (None, 160)
        reconstructed_image = self.dense_1(v_) # reconstructed_image.shape: (None, 512)
        reconstructed_image = self.dense_2(reconstructed_image) # reconstructed_image.shape: (None, 1024)
        reconstructed_image = self.dense_3(reconstructed_image) # reconstructed_image.shape: (None, 3072)
        
    return reconstructed_image

还有一个额外的脚本可以使用此 class 并训练模型。主要部分是:

import numpy as np
from tqdm import tqdm
import tensorflow as tf
from datetime import datetime

import matplotlib
import matplotlib.pyplot as plt

from CapsuleNetworkClassWorkingExample import CapsuleNetwork

print(tf.__version__)

# =============================================================================
# Hyperparameter based on paper
epsilon = 1e-7
m_plus = 0.9
m_minus = 0.1
lambda_ = 0.5
alpha = 0.0005
epochs = 50
no_of_secondary_capsules = 10
batch_size = 64

optimizer = tf.keras.optimizers.Adam()

# parameters for CapsNet architecture
params = {
"input_dimension": 28,
"no_channels": 1,                 # 3 for rgb or 1 for gray-scale images
"no_of_convs": 1,
"kernel_sizes": (9,9),            # kernel sizes of convolutional layers. First entry: kernel of conv0; second entry: kernel of conv1 etc...
"conv_strides": (1,2),            # strides of the convolutional layers. see "kernel_sizes"
"no_of_conv_kernels": 256,        # number of kernels for all conv layers (including primary capsule convolution layer)
"no_of_primary_capsules": 32,     # number of primary capsule channels
"no_of_secondary_capsules": 10,   # number of digit capsules (has to be equal to number of classes)
"primary_capsule_vector": 8,      # dimension of primary capsules
"secondary_capsule_vector": 16,   # dimension of digit capsules 
"r":3,                            # number of routing iterations
}
# =============================================================================
# Save directory for np arrays (losses & acc)
loss_acc_path = './logs/losses_acc'
# Tensorboard Checkpoints
checkpoint_path = './logs/model/capsule'

stamp = datetime.now().strftime("%Y%m%d-%H%M%S")

logdir = './logs/func/%s' % stamp
writer = tf.summary.create_file_writer(logdir)

scalar_logdir = './logs/scalars/%s' % stamp
file_writer = tf.summary.create_file_writer(scalar_logdir + "/metrics")

# Import MNIST and divide into train, val and test dataset [left out here]

# functions to evaluate loss, and train the model
def safe_norm(v, axis=-1, epsilon=1e-7):
    """"
    input:
        v → digit capsule output of dimension (None, 1, 10, 16) dimension varies for number of labels
        axis → integer value
        epsilon → needed to not receive NaN
    output:
        normalization of input v
    """
    v_ = tf.reduce_sum(tf.square(v), axis = axis, keepdims=True)
    return tf.sqrt(v_ + epsilon)

def loss_function(v, reconstructed_image, y, y_image):
    prediction = safe_norm(v)
    prediction = tf.reshape(prediction, [-1, no_of_secondary_capsules])    

    left_margin = tf.square(tf.maximum(0.0, m_plus - prediction))
    right_margin = tf.square(tf.maximum(0.0, prediction - m_minus))
    
    l = tf.add(y * left_margin, lambda_ * (1.0 - y) * right_margin)

    margin_loss = tf.reduce_mean(tf.reduce_sum(l, axis=-1))

    y_image_flat = tf.reshape(y_image, [-1, 
params.get("input_dimension")*params.get("input_dimension")*params.get("no_channels")]) 
#reshape depends on input shape
    reconstruction_loss = tf.reduce_mean(tf.square(y_image_flat - reconstructed_image))

    loss = tf.add(margin_loss, alpha * reconstruction_loss)

    return loss

def train(x,y):
        y_one_hot = tf.one_hot(y, depth=10)
    with tf.GradientTape() as tape:
        v, reconstructed_image = model([x, y_one_hot])
        loss = loss_function(v, reconstructed_image, y_one_hot, x)
    grad = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(zip(grad, model.trainable_variables))
    return loss

def predict(model, x):
    pred = safe_norm(model.predict_capsule_output(x))
    pred = tf.squeeze(pred, [1])
    return np.argmax(pred, axis=1)[:,0]
tf.summary.trace_on(graph=True, profiler=True)

# create the model
model = CapsuleNetwork(**params)

model.print_architecture()

_ = train(X_train[:32],y_train[:32])

tf.summary.trace_off()
model.summary()

checkpoint = tf.train.Checkpoint(model=model)

train_losses = []
val_losses = []
val_accuracy = []
for i in range(1, epochs+1, 1):

    loss = 0
    with tqdm(total=len(train_dataset)) as pbar:
    
        description = "Epoch " + str(i) + "/" + str(epochs)
        pbar.set_description_str(description)

        for X_batch, y_batch in train_dataset:

            loss += train(X_batch,y_batch)
            pbar.update(1)

        loss /= len(train_dataset)
        train_losses.append(loss.numpy())
    
        training_sum = 0

        print_statement = "Training Loss :" + str(loss.numpy()) + " Evaluating Validation Loss and Accuracy ..."
        pbar.set_postfix_str(print_statement)
    
        epoch_val_loss = []
        for X_batch, y_batch in val_dataset:
            training_sum += sum(predict(model, X_batch)==y_batch.numpy())
            y_one_hot = tf.one_hot(y_batch, depth=10)
            v = model.predict_capsule_output(X_batch)
            pred_class = predict(model,X_batch)
            reconstruction = model.regenerate_image((v,pred_class))
            val_loss = loss_function(v,reconstruction,y_one_hot,X_batch)
            epoch_val_loss.append(val_loss)
        val_accuracy.append(training_sum/val_dataset_size)
        val_losses.append(np.mean(epoch_val_loss))
    
        with file_writer.as_default():
            tf.summary.scalar('Training Loss', data=loss.numpy(), step=i)
            tf.summary.scalar(' Validation Accuracy', data=val_accuracy[-1], step=i)
            tf.summary.scalar(' Validation Loss', data=val_losses[-1], step=i)
    
        print_statement = "Train Loss :" + str(loss.numpy()) + "Val Accuracy :" + str(val_accuracy[-1]) + "Val Loss :" + str(val_losses[-1])
    
        if i != 1:
            if train_losses[i-1] < np.amin(train_losses[:-1]):
                print_statement += ' Checkpoint Saved'
                checkpoint.save(checkpoint_path)
        else:
            print_statement += ' Checkpoint Saved'
            checkpoint.save(checkpoint_path)
        
    
        pbar.set_postfix_str(print_statement)

现在,当我通过预测测试数据集的 class 输出来评估测试准确度时,我获得了约 98% 的准确度(取决于检查点)。这大约比其他实现实现的少 1%。我真的不明白为什么会这样。下面,您可以看到模型的学习曲线。我希望有人可以提示为什么我的结果可能与其他人的代码不同。 Learning Curves

虽然没有详细查看您的代码,但在使用深度学习时,1% 的差异确实不多。差异可能是由不同的(随机)权重初始化或导致不同学习轨迹的略微不同的梯度引起的。 Re-training 网络可能每次都会导致略有不同的结果。