胶囊网络的准确性差 - 实施错误?
poor accuracy of Capsule Network - mistake in the implementation?
我正在研究应该可定制的胶囊网络实现。我找到了一个非常简单的代码 (https://towardsdatascience.com/implementing-capsule-network-in-tensorflow-11e4cca5ecae)。我使用了代码并将其更改为我的需要。
但是,我的代码在测试数据集 (MNIST) 上的准确度与其他实现和论文“胶囊之间的动态路由”所建议的准确度不同。胶囊网络的实现是否存在可能的错误?该代码使用 tf subclassing 创建 CapsNet 模型。
这是模型的 class:
import tensorflow as tf
import numpy as np
class CapsuleNetwork(tf.keras.Model):
def __init__(self, input_dimension, no_channels, no_of_convs, kernel_sizes, conv_strides, no_of_conv_kernels, no_of_primary_capsules, primary_capsule_vector, no_of_secondary_capsules, secondary_capsule_vector, r):
super(CapsuleNetwork, self).__init__()
self.input_dimension = input_dimension
self.no_channels = no_channels
self.no_of_convs = no_of_convs
self.kernel_sizes = kernel_sizes
self.conv_strides = conv_strides
self.no_of_conv_kernels = no_of_conv_kernels
self.no_of_primary_capsules = no_of_primary_capsules
self.primary_capsule_vector = primary_capsule_vector
self.no_of_secondary_capsules = no_of_secondary_capsules
self.secondary_capsule_vector = secondary_capsule_vector
self.r = r
self.epsilon = 10e-7
# calculating the number of primary capsules based on input image size and kernels/strides of the convolution (1152 in the paper)
for i in range(self.no_of_convs):
if i == 0:
self.feature_map_dim = int(((input_dimension-self.kernel_sizes[i])/self.conv_strides[i])+1)
else:
self.feature_map_dim = int(((self.feature_map_dim-self.kernel_sizes[i])/self.conv_strides[i])+1)
self.feature_map_dim = int(((self.feature_map_dim-self.kernel_sizes[-1])/self.conv_strides[-1])+1)
self.no_primary_caps = int(self.feature_map_dim*self.feature_map_dim*self.no_of_primary_capsules)
with tf.name_scope("Variables") as scope:
self.conv_dict = {}
for i in range(no_of_convs):
self.conv_dict[f"convolution{i}"] = tf.keras.layers.Conv2D(self.no_of_conv_kernels, [self.kernel_sizes[i],self.kernel_sizes[i]], strides=[self.conv_strides[i],self.conv_strides[i]], name=f'ConvolutionLayer_{i}', activation='relu')
self.primary_capsule = tf.keras.layers.Conv2D(self.no_of_primary_capsules * self.primary_capsule_vector, [self.kernel_sizes[-1],self.kernel_sizes[-1]], strides=[self.conv_strides[-1],self.conv_strides[-1]], name="PrimaryCapsule")
self.w = tf.Variable(tf.random_normal_initializer()(shape=[1, self.no_primary_caps, self.no_of_secondary_capsules, self.secondary_capsule_vector, self.primary_capsule_vector]), dtype=tf.float32, name="PoseEstimation", trainable=True)
self.dense_1 = tf.keras.layers.Dense(units = 512, activation='relu')
self.dense_2 = tf.keras.layers.Dense(units = 1024, activation='relu')
self.dense_3 = tf.keras.layers.Dense(units = input_dimension*input_dimension*no_channels, activation='sigmoid', dtype='float32')
def print_architecture(self):
print("Model Summary:\n")
print("Input images: " + str(self.input_dimension) + "x" + str(self.input_dimension) + "x" + str(self.no_channels))
print(str(self.no_of_convs) + " conv layers")
print(str(self.no_of_primary_capsules) + " primary capsule channels with " + str(self.no_primary_caps) + " " + str(self.primary_capsule_vector) + "-D capsules")
print(str(self.no_of_secondary_capsules) + " " + str(self.secondary_capsule_vector) + "-D digit capsules")
def build(self, input_shape):
pass
def squash(self, s):
with tf.name_scope("SquashFunction") as scope:
s_norm = tf.norm(s, axis=-1, keepdims=True)
return tf.square(s_norm)/(1 + tf.square(s_norm)) * s/(s_norm + self.epsilon)
@tf.function
def call(self, inputs):
input_x, y = inputs
for i in range(self.no_of_convs):
convolution = self.conv_dict.get(f"convolution{i}")
if i == 0:
x = convolution(input_x) # x.shape: (None, 20, 20, 256) JB:stimmt nun nicht mehr zwangsläufig
else:
x = convolution(x)
x = self.primary_capsule(x) # x.shape: (None, 6, 6, 256)
with tf.name_scope("CapsuleFormation") as scope:
u = tf.reshape(x, (-1, self.no_of_primary_capsules * x.shape[1] * x.shape[2], self.primary_capsule_vector)) # u.shape: (None, 1152, 8)
u = tf.expand_dims(u, axis=-2) # u.shape: (None, 1152, 1, 8)
u = tf.expand_dims(u, axis=-1) # u.shape: (None, 1152, 1, 8, 1)
u_hat = tf.matmul(self.w, u) # u_hat.shape: (None, 1152, 10, 16, 1)
u_hat = tf.squeeze(u_hat, [4]) # u_hat.shape: (None, 1152, 10, 16)
with tf.name_scope("DynamicRouting") as scope:
b = tf.zeros((input_x.shape[0], self.no_primary_caps, self.no_of_secondary_capsules, 1)) # b.shape: (None, 1152, 10, 1)
for i in range(self.r): # self.r = 3
c = tf.nn.softmax(b, axis=-2) # c.shape: (None, 1152, 10, 1)
s = tf.reduce_sum(tf.multiply(c, u_hat), axis=1, keepdims=True) # s.shape: (None, 1, 10, 16)
v = self.squash(s) # v.shape: (None, 1, 10, 16)
agreement = tf.squeeze(tf.matmul(tf.expand_dims(u_hat, axis=-1), tf.expand_dims(v, axis=-1), transpose_a=True), [4]) # agreement.shape: (None, 1152, 10, 1)
# Before matmul following intermediate shapes are present, they are not assigned to a variable but just for understanding the code.
# u_hat.shape (Intermediate shape) : (None, 1152, 10, 16, 1)
# v.shape (Intermediate shape): (None, 1, 10, 16, 1)
# Since the first parameter of matmul is to be transposed its shape becomes:(None, 1152, 10, 1, 16)
# Now matmul is performed in the last two dimensions, and others are broadcasted
# Before squeezing we have an intermediate shape of (None, 1152, 10, 1, 1)
b += agreement
with tf.name_scope("Masking") as scope:
y = tf.expand_dims(y, axis=-1) # y.shape: (None, 10, 1)
y = tf.expand_dims(y, axis=1) # y.shape: (None, 1, 10, 1)
mask = tf.cast(y, dtype=tf.float32) # mask.shape: (None, 1, 10, 1)
v_masked = tf.multiply(mask, v) # v_masked.shape: (None, 1, 10, 16)
with tf.name_scope("Reconstruction") as scope:
v_ = tf.reshape(v_masked, [-1, self.no_of_secondary_capsules * self.secondary_capsule_vector]) # v_.shape: (None, 160)
reconstructed_image = self.dense_1(v_) # reconstructed_image.shape: (None, 512)
reconstructed_image = self.dense_2(reconstructed_image) # reconstructed_image.shape: (None, 1024)
reconstructed_image = self.dense_3(reconstructed_image) # reconstructed_image.shape: (None, 784)
return v, reconstructed_image
@tf.function
def predict_capsule_output(self, inputs):
for i in range(self.no_of_convs):
convolution = self.conv_dict.get(f"convolution{i}")
if i == 0:
x = convolution(inputs) # x.shape: (None, 20, 20, 256) JB:stimmt nun nicht mehr zwangsläufig
else:
x = convolution(x)
x = self.primary_capsule(x) # x.shape: (None, 6, 6, 256)
with tf.name_scope("CapsuleFormation") as scope:
u = tf.reshape(x, (-1, self.no_of_primary_capsules * x.shape[1] * x.shape[2], self.primary_capsule_vector)) # u.shape: (None, 1152, 8)
u = tf.expand_dims(u, axis=-2) # u.shape: (None, 1152, 1, 8)
u = tf.expand_dims(u, axis=-1) # u.shape: (None, 1152, 1, 8, 1)
u_hat = tf.matmul(self.w, u) # u_hat.shape: (None, 1152, 10, 16, 1)
u_hat = tf.squeeze(u_hat, [4]) # u_hat.shape: (None, 1152, 10, 16)
with tf.name_scope("DynamicRouting") as scope:
b = tf.zeros((inputs.shape[0], self.no_primary_caps, self.no_of_secondary_capsules, 1)) # b.shape: (None, 1152, 10, 1)
for i in range(self.r): # self.r = 3
c = tf.nn.softmax(b, axis=-2) # c.shape: (None, 1152, 10, 1)
s = tf.reduce_sum(tf.multiply(c, u_hat), axis=1, keepdims=True) # s.shape: (None, 1, 10, 16)
v = self.squash(s) # v.shape: (None, 1, 10, 16)
agreement = tf.squeeze(tf.matmul(tf.expand_dims(u_hat, axis=-1), tf.expand_dims(v, axis=-1), transpose_a=True), [4]) # agreement.shape: (None, 1152, 10, 1)
# Before matmul following intermediate shapes are present, they are not assigned to a variable but just for understanding the code.
# u_hat.shape (Intermediate shape) : (None, 1152, 10, 16, 1)
# v.shape (Intermediate shape): (None, 1, 10, 16, 1)
# Since the first parameter of matmul is to be transposed its shape becomes:(None, 1152, 10, 1, 16)
# Now matmul is performed in the last two dimensions, and others are broadcasted
# Before squeezing we have an intermediate shape of (None, 1152, 10, 1, 1)
b += agreement
return v
@tf.function
def regenerate_image(self, inputs):
v, pred_class = inputs
with tf.name_scope("Reconstruction") as scope:
v_ = tf.reshape(v, [-1, self.no_of_secondary_capsules * self.secondary_capsule_vector]) # v_.shape: (None, 160)
reconstructed_image = self.dense_1(v_) # reconstructed_image.shape: (None, 512)
reconstructed_image = self.dense_2(reconstructed_image) # reconstructed_image.shape: (None, 1024)
reconstructed_image = self.dense_3(reconstructed_image) # reconstructed_image.shape: (None, 3072)
return reconstructed_image
还有一个额外的脚本可以使用此 class 并训练模型。主要部分是:
import numpy as np
from tqdm import tqdm
import tensorflow as tf
from datetime import datetime
import matplotlib
import matplotlib.pyplot as plt
from CapsuleNetworkClassWorkingExample import CapsuleNetwork
print(tf.__version__)
# =============================================================================
# Hyperparameter based on paper
epsilon = 1e-7
m_plus = 0.9
m_minus = 0.1
lambda_ = 0.5
alpha = 0.0005
epochs = 50
no_of_secondary_capsules = 10
batch_size = 64
optimizer = tf.keras.optimizers.Adam()
# parameters for CapsNet architecture
params = {
"input_dimension": 28,
"no_channels": 1, # 3 for rgb or 1 for gray-scale images
"no_of_convs": 1,
"kernel_sizes": (9,9), # kernel sizes of convolutional layers. First entry: kernel of conv0; second entry: kernel of conv1 etc...
"conv_strides": (1,2), # strides of the convolutional layers. see "kernel_sizes"
"no_of_conv_kernels": 256, # number of kernels for all conv layers (including primary capsule convolution layer)
"no_of_primary_capsules": 32, # number of primary capsule channels
"no_of_secondary_capsules": 10, # number of digit capsules (has to be equal to number of classes)
"primary_capsule_vector": 8, # dimension of primary capsules
"secondary_capsule_vector": 16, # dimension of digit capsules
"r":3, # number of routing iterations
}
# =============================================================================
# Save directory for np arrays (losses & acc)
loss_acc_path = './logs/losses_acc'
# Tensorboard Checkpoints
checkpoint_path = './logs/model/capsule'
stamp = datetime.now().strftime("%Y%m%d-%H%M%S")
logdir = './logs/func/%s' % stamp
writer = tf.summary.create_file_writer(logdir)
scalar_logdir = './logs/scalars/%s' % stamp
file_writer = tf.summary.create_file_writer(scalar_logdir + "/metrics")
# Import MNIST and divide into train, val and test dataset [left out here]
# functions to evaluate loss, and train the model
def safe_norm(v, axis=-1, epsilon=1e-7):
""""
input:
v → digit capsule output of dimension (None, 1, 10, 16) dimension varies for number of labels
axis → integer value
epsilon → needed to not receive NaN
output:
normalization of input v
"""
v_ = tf.reduce_sum(tf.square(v), axis = axis, keepdims=True)
return tf.sqrt(v_ + epsilon)
def loss_function(v, reconstructed_image, y, y_image):
prediction = safe_norm(v)
prediction = tf.reshape(prediction, [-1, no_of_secondary_capsules])
left_margin = tf.square(tf.maximum(0.0, m_plus - prediction))
right_margin = tf.square(tf.maximum(0.0, prediction - m_minus))
l = tf.add(y * left_margin, lambda_ * (1.0 - y) * right_margin)
margin_loss = tf.reduce_mean(tf.reduce_sum(l, axis=-1))
y_image_flat = tf.reshape(y_image, [-1,
params.get("input_dimension")*params.get("input_dimension")*params.get("no_channels")])
#reshape depends on input shape
reconstruction_loss = tf.reduce_mean(tf.square(y_image_flat - reconstructed_image))
loss = tf.add(margin_loss, alpha * reconstruction_loss)
return loss
def train(x,y):
y_one_hot = tf.one_hot(y, depth=10)
with tf.GradientTape() as tape:
v, reconstructed_image = model([x, y_one_hot])
loss = loss_function(v, reconstructed_image, y_one_hot, x)
grad = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(zip(grad, model.trainable_variables))
return loss
def predict(model, x):
pred = safe_norm(model.predict_capsule_output(x))
pred = tf.squeeze(pred, [1])
return np.argmax(pred, axis=1)[:,0]
tf.summary.trace_on(graph=True, profiler=True)
# create the model
model = CapsuleNetwork(**params)
model.print_architecture()
_ = train(X_train[:32],y_train[:32])
tf.summary.trace_off()
model.summary()
checkpoint = tf.train.Checkpoint(model=model)
train_losses = []
val_losses = []
val_accuracy = []
for i in range(1, epochs+1, 1):
loss = 0
with tqdm(total=len(train_dataset)) as pbar:
description = "Epoch " + str(i) + "/" + str(epochs)
pbar.set_description_str(description)
for X_batch, y_batch in train_dataset:
loss += train(X_batch,y_batch)
pbar.update(1)
loss /= len(train_dataset)
train_losses.append(loss.numpy())
training_sum = 0
print_statement = "Training Loss :" + str(loss.numpy()) + " Evaluating Validation Loss and Accuracy ..."
pbar.set_postfix_str(print_statement)
epoch_val_loss = []
for X_batch, y_batch in val_dataset:
training_sum += sum(predict(model, X_batch)==y_batch.numpy())
y_one_hot = tf.one_hot(y_batch, depth=10)
v = model.predict_capsule_output(X_batch)
pred_class = predict(model,X_batch)
reconstruction = model.regenerate_image((v,pred_class))
val_loss = loss_function(v,reconstruction,y_one_hot,X_batch)
epoch_val_loss.append(val_loss)
val_accuracy.append(training_sum/val_dataset_size)
val_losses.append(np.mean(epoch_val_loss))
with file_writer.as_default():
tf.summary.scalar('Training Loss', data=loss.numpy(), step=i)
tf.summary.scalar(' Validation Accuracy', data=val_accuracy[-1], step=i)
tf.summary.scalar(' Validation Loss', data=val_losses[-1], step=i)
print_statement = "Train Loss :" + str(loss.numpy()) + "Val Accuracy :" + str(val_accuracy[-1]) + "Val Loss :" + str(val_losses[-1])
if i != 1:
if train_losses[i-1] < np.amin(train_losses[:-1]):
print_statement += ' Checkpoint Saved'
checkpoint.save(checkpoint_path)
else:
print_statement += ' Checkpoint Saved'
checkpoint.save(checkpoint_path)
pbar.set_postfix_str(print_statement)
现在,当我通过预测测试数据集的 class 输出来评估测试准确度时,我获得了约 98% 的准确度(取决于检查点)。这大约比其他实现实现的少 1%。我真的不明白为什么会这样。下面,您可以看到模型的学习曲线。我希望有人可以提示为什么我的结果可能与其他人的代码不同。
Learning Curves
虽然没有详细查看您的代码,但在使用深度学习时,1% 的差异确实不多。差异可能是由不同的(随机)权重初始化或导致不同学习轨迹的略微不同的梯度引起的。 Re-training 网络可能每次都会导致略有不同的结果。
我正在研究应该可定制的胶囊网络实现。我找到了一个非常简单的代码 (https://towardsdatascience.com/implementing-capsule-network-in-tensorflow-11e4cca5ecae)。我使用了代码并将其更改为我的需要。
但是,我的代码在测试数据集 (MNIST) 上的准确度与其他实现和论文“胶囊之间的动态路由”所建议的准确度不同。胶囊网络的实现是否存在可能的错误?该代码使用 tf subclassing 创建 CapsNet 模型。 这是模型的 class:
import tensorflow as tf
import numpy as np
class CapsuleNetwork(tf.keras.Model):
def __init__(self, input_dimension, no_channels, no_of_convs, kernel_sizes, conv_strides, no_of_conv_kernels, no_of_primary_capsules, primary_capsule_vector, no_of_secondary_capsules, secondary_capsule_vector, r):
super(CapsuleNetwork, self).__init__()
self.input_dimension = input_dimension
self.no_channels = no_channels
self.no_of_convs = no_of_convs
self.kernel_sizes = kernel_sizes
self.conv_strides = conv_strides
self.no_of_conv_kernels = no_of_conv_kernels
self.no_of_primary_capsules = no_of_primary_capsules
self.primary_capsule_vector = primary_capsule_vector
self.no_of_secondary_capsules = no_of_secondary_capsules
self.secondary_capsule_vector = secondary_capsule_vector
self.r = r
self.epsilon = 10e-7
# calculating the number of primary capsules based on input image size and kernels/strides of the convolution (1152 in the paper)
for i in range(self.no_of_convs):
if i == 0:
self.feature_map_dim = int(((input_dimension-self.kernel_sizes[i])/self.conv_strides[i])+1)
else:
self.feature_map_dim = int(((self.feature_map_dim-self.kernel_sizes[i])/self.conv_strides[i])+1)
self.feature_map_dim = int(((self.feature_map_dim-self.kernel_sizes[-1])/self.conv_strides[-1])+1)
self.no_primary_caps = int(self.feature_map_dim*self.feature_map_dim*self.no_of_primary_capsules)
with tf.name_scope("Variables") as scope:
self.conv_dict = {}
for i in range(no_of_convs):
self.conv_dict[f"convolution{i}"] = tf.keras.layers.Conv2D(self.no_of_conv_kernels, [self.kernel_sizes[i],self.kernel_sizes[i]], strides=[self.conv_strides[i],self.conv_strides[i]], name=f'ConvolutionLayer_{i}', activation='relu')
self.primary_capsule = tf.keras.layers.Conv2D(self.no_of_primary_capsules * self.primary_capsule_vector, [self.kernel_sizes[-1],self.kernel_sizes[-1]], strides=[self.conv_strides[-1],self.conv_strides[-1]], name="PrimaryCapsule")
self.w = tf.Variable(tf.random_normal_initializer()(shape=[1, self.no_primary_caps, self.no_of_secondary_capsules, self.secondary_capsule_vector, self.primary_capsule_vector]), dtype=tf.float32, name="PoseEstimation", trainable=True)
self.dense_1 = tf.keras.layers.Dense(units = 512, activation='relu')
self.dense_2 = tf.keras.layers.Dense(units = 1024, activation='relu')
self.dense_3 = tf.keras.layers.Dense(units = input_dimension*input_dimension*no_channels, activation='sigmoid', dtype='float32')
def print_architecture(self):
print("Model Summary:\n")
print("Input images: " + str(self.input_dimension) + "x" + str(self.input_dimension) + "x" + str(self.no_channels))
print(str(self.no_of_convs) + " conv layers")
print(str(self.no_of_primary_capsules) + " primary capsule channels with " + str(self.no_primary_caps) + " " + str(self.primary_capsule_vector) + "-D capsules")
print(str(self.no_of_secondary_capsules) + " " + str(self.secondary_capsule_vector) + "-D digit capsules")
def build(self, input_shape):
pass
def squash(self, s):
with tf.name_scope("SquashFunction") as scope:
s_norm = tf.norm(s, axis=-1, keepdims=True)
return tf.square(s_norm)/(1 + tf.square(s_norm)) * s/(s_norm + self.epsilon)
@tf.function
def call(self, inputs):
input_x, y = inputs
for i in range(self.no_of_convs):
convolution = self.conv_dict.get(f"convolution{i}")
if i == 0:
x = convolution(input_x) # x.shape: (None, 20, 20, 256) JB:stimmt nun nicht mehr zwangsläufig
else:
x = convolution(x)
x = self.primary_capsule(x) # x.shape: (None, 6, 6, 256)
with tf.name_scope("CapsuleFormation") as scope:
u = tf.reshape(x, (-1, self.no_of_primary_capsules * x.shape[1] * x.shape[2], self.primary_capsule_vector)) # u.shape: (None, 1152, 8)
u = tf.expand_dims(u, axis=-2) # u.shape: (None, 1152, 1, 8)
u = tf.expand_dims(u, axis=-1) # u.shape: (None, 1152, 1, 8, 1)
u_hat = tf.matmul(self.w, u) # u_hat.shape: (None, 1152, 10, 16, 1)
u_hat = tf.squeeze(u_hat, [4]) # u_hat.shape: (None, 1152, 10, 16)
with tf.name_scope("DynamicRouting") as scope:
b = tf.zeros((input_x.shape[0], self.no_primary_caps, self.no_of_secondary_capsules, 1)) # b.shape: (None, 1152, 10, 1)
for i in range(self.r): # self.r = 3
c = tf.nn.softmax(b, axis=-2) # c.shape: (None, 1152, 10, 1)
s = tf.reduce_sum(tf.multiply(c, u_hat), axis=1, keepdims=True) # s.shape: (None, 1, 10, 16)
v = self.squash(s) # v.shape: (None, 1, 10, 16)
agreement = tf.squeeze(tf.matmul(tf.expand_dims(u_hat, axis=-1), tf.expand_dims(v, axis=-1), transpose_a=True), [4]) # agreement.shape: (None, 1152, 10, 1)
# Before matmul following intermediate shapes are present, they are not assigned to a variable but just for understanding the code.
# u_hat.shape (Intermediate shape) : (None, 1152, 10, 16, 1)
# v.shape (Intermediate shape): (None, 1, 10, 16, 1)
# Since the first parameter of matmul is to be transposed its shape becomes:(None, 1152, 10, 1, 16)
# Now matmul is performed in the last two dimensions, and others are broadcasted
# Before squeezing we have an intermediate shape of (None, 1152, 10, 1, 1)
b += agreement
with tf.name_scope("Masking") as scope:
y = tf.expand_dims(y, axis=-1) # y.shape: (None, 10, 1)
y = tf.expand_dims(y, axis=1) # y.shape: (None, 1, 10, 1)
mask = tf.cast(y, dtype=tf.float32) # mask.shape: (None, 1, 10, 1)
v_masked = tf.multiply(mask, v) # v_masked.shape: (None, 1, 10, 16)
with tf.name_scope("Reconstruction") as scope:
v_ = tf.reshape(v_masked, [-1, self.no_of_secondary_capsules * self.secondary_capsule_vector]) # v_.shape: (None, 160)
reconstructed_image = self.dense_1(v_) # reconstructed_image.shape: (None, 512)
reconstructed_image = self.dense_2(reconstructed_image) # reconstructed_image.shape: (None, 1024)
reconstructed_image = self.dense_3(reconstructed_image) # reconstructed_image.shape: (None, 784)
return v, reconstructed_image
@tf.function
def predict_capsule_output(self, inputs):
for i in range(self.no_of_convs):
convolution = self.conv_dict.get(f"convolution{i}")
if i == 0:
x = convolution(inputs) # x.shape: (None, 20, 20, 256) JB:stimmt nun nicht mehr zwangsläufig
else:
x = convolution(x)
x = self.primary_capsule(x) # x.shape: (None, 6, 6, 256)
with tf.name_scope("CapsuleFormation") as scope:
u = tf.reshape(x, (-1, self.no_of_primary_capsules * x.shape[1] * x.shape[2], self.primary_capsule_vector)) # u.shape: (None, 1152, 8)
u = tf.expand_dims(u, axis=-2) # u.shape: (None, 1152, 1, 8)
u = tf.expand_dims(u, axis=-1) # u.shape: (None, 1152, 1, 8, 1)
u_hat = tf.matmul(self.w, u) # u_hat.shape: (None, 1152, 10, 16, 1)
u_hat = tf.squeeze(u_hat, [4]) # u_hat.shape: (None, 1152, 10, 16)
with tf.name_scope("DynamicRouting") as scope:
b = tf.zeros((inputs.shape[0], self.no_primary_caps, self.no_of_secondary_capsules, 1)) # b.shape: (None, 1152, 10, 1)
for i in range(self.r): # self.r = 3
c = tf.nn.softmax(b, axis=-2) # c.shape: (None, 1152, 10, 1)
s = tf.reduce_sum(tf.multiply(c, u_hat), axis=1, keepdims=True) # s.shape: (None, 1, 10, 16)
v = self.squash(s) # v.shape: (None, 1, 10, 16)
agreement = tf.squeeze(tf.matmul(tf.expand_dims(u_hat, axis=-1), tf.expand_dims(v, axis=-1), transpose_a=True), [4]) # agreement.shape: (None, 1152, 10, 1)
# Before matmul following intermediate shapes are present, they are not assigned to a variable but just for understanding the code.
# u_hat.shape (Intermediate shape) : (None, 1152, 10, 16, 1)
# v.shape (Intermediate shape): (None, 1, 10, 16, 1)
# Since the first parameter of matmul is to be transposed its shape becomes:(None, 1152, 10, 1, 16)
# Now matmul is performed in the last two dimensions, and others are broadcasted
# Before squeezing we have an intermediate shape of (None, 1152, 10, 1, 1)
b += agreement
return v
@tf.function
def regenerate_image(self, inputs):
v, pred_class = inputs
with tf.name_scope("Reconstruction") as scope:
v_ = tf.reshape(v, [-1, self.no_of_secondary_capsules * self.secondary_capsule_vector]) # v_.shape: (None, 160)
reconstructed_image = self.dense_1(v_) # reconstructed_image.shape: (None, 512)
reconstructed_image = self.dense_2(reconstructed_image) # reconstructed_image.shape: (None, 1024)
reconstructed_image = self.dense_3(reconstructed_image) # reconstructed_image.shape: (None, 3072)
return reconstructed_image
还有一个额外的脚本可以使用此 class 并训练模型。主要部分是:
import numpy as np
from tqdm import tqdm
import tensorflow as tf
from datetime import datetime
import matplotlib
import matplotlib.pyplot as plt
from CapsuleNetworkClassWorkingExample import CapsuleNetwork
print(tf.__version__)
# =============================================================================
# Hyperparameter based on paper
epsilon = 1e-7
m_plus = 0.9
m_minus = 0.1
lambda_ = 0.5
alpha = 0.0005
epochs = 50
no_of_secondary_capsules = 10
batch_size = 64
optimizer = tf.keras.optimizers.Adam()
# parameters for CapsNet architecture
params = {
"input_dimension": 28,
"no_channels": 1, # 3 for rgb or 1 for gray-scale images
"no_of_convs": 1,
"kernel_sizes": (9,9), # kernel sizes of convolutional layers. First entry: kernel of conv0; second entry: kernel of conv1 etc...
"conv_strides": (1,2), # strides of the convolutional layers. see "kernel_sizes"
"no_of_conv_kernels": 256, # number of kernels for all conv layers (including primary capsule convolution layer)
"no_of_primary_capsules": 32, # number of primary capsule channels
"no_of_secondary_capsules": 10, # number of digit capsules (has to be equal to number of classes)
"primary_capsule_vector": 8, # dimension of primary capsules
"secondary_capsule_vector": 16, # dimension of digit capsules
"r":3, # number of routing iterations
}
# =============================================================================
# Save directory for np arrays (losses & acc)
loss_acc_path = './logs/losses_acc'
# Tensorboard Checkpoints
checkpoint_path = './logs/model/capsule'
stamp = datetime.now().strftime("%Y%m%d-%H%M%S")
logdir = './logs/func/%s' % stamp
writer = tf.summary.create_file_writer(logdir)
scalar_logdir = './logs/scalars/%s' % stamp
file_writer = tf.summary.create_file_writer(scalar_logdir + "/metrics")
# Import MNIST and divide into train, val and test dataset [left out here]
# functions to evaluate loss, and train the model
def safe_norm(v, axis=-1, epsilon=1e-7):
""""
input:
v → digit capsule output of dimension (None, 1, 10, 16) dimension varies for number of labels
axis → integer value
epsilon → needed to not receive NaN
output:
normalization of input v
"""
v_ = tf.reduce_sum(tf.square(v), axis = axis, keepdims=True)
return tf.sqrt(v_ + epsilon)
def loss_function(v, reconstructed_image, y, y_image):
prediction = safe_norm(v)
prediction = tf.reshape(prediction, [-1, no_of_secondary_capsules])
left_margin = tf.square(tf.maximum(0.0, m_plus - prediction))
right_margin = tf.square(tf.maximum(0.0, prediction - m_minus))
l = tf.add(y * left_margin, lambda_ * (1.0 - y) * right_margin)
margin_loss = tf.reduce_mean(tf.reduce_sum(l, axis=-1))
y_image_flat = tf.reshape(y_image, [-1,
params.get("input_dimension")*params.get("input_dimension")*params.get("no_channels")])
#reshape depends on input shape
reconstruction_loss = tf.reduce_mean(tf.square(y_image_flat - reconstructed_image))
loss = tf.add(margin_loss, alpha * reconstruction_loss)
return loss
def train(x,y):
y_one_hot = tf.one_hot(y, depth=10)
with tf.GradientTape() as tape:
v, reconstructed_image = model([x, y_one_hot])
loss = loss_function(v, reconstructed_image, y_one_hot, x)
grad = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(zip(grad, model.trainable_variables))
return loss
def predict(model, x):
pred = safe_norm(model.predict_capsule_output(x))
pred = tf.squeeze(pred, [1])
return np.argmax(pred, axis=1)[:,0]
tf.summary.trace_on(graph=True, profiler=True)
# create the model
model = CapsuleNetwork(**params)
model.print_architecture()
_ = train(X_train[:32],y_train[:32])
tf.summary.trace_off()
model.summary()
checkpoint = tf.train.Checkpoint(model=model)
train_losses = []
val_losses = []
val_accuracy = []
for i in range(1, epochs+1, 1):
loss = 0
with tqdm(total=len(train_dataset)) as pbar:
description = "Epoch " + str(i) + "/" + str(epochs)
pbar.set_description_str(description)
for X_batch, y_batch in train_dataset:
loss += train(X_batch,y_batch)
pbar.update(1)
loss /= len(train_dataset)
train_losses.append(loss.numpy())
training_sum = 0
print_statement = "Training Loss :" + str(loss.numpy()) + " Evaluating Validation Loss and Accuracy ..."
pbar.set_postfix_str(print_statement)
epoch_val_loss = []
for X_batch, y_batch in val_dataset:
training_sum += sum(predict(model, X_batch)==y_batch.numpy())
y_one_hot = tf.one_hot(y_batch, depth=10)
v = model.predict_capsule_output(X_batch)
pred_class = predict(model,X_batch)
reconstruction = model.regenerate_image((v,pred_class))
val_loss = loss_function(v,reconstruction,y_one_hot,X_batch)
epoch_val_loss.append(val_loss)
val_accuracy.append(training_sum/val_dataset_size)
val_losses.append(np.mean(epoch_val_loss))
with file_writer.as_default():
tf.summary.scalar('Training Loss', data=loss.numpy(), step=i)
tf.summary.scalar(' Validation Accuracy', data=val_accuracy[-1], step=i)
tf.summary.scalar(' Validation Loss', data=val_losses[-1], step=i)
print_statement = "Train Loss :" + str(loss.numpy()) + "Val Accuracy :" + str(val_accuracy[-1]) + "Val Loss :" + str(val_losses[-1])
if i != 1:
if train_losses[i-1] < np.amin(train_losses[:-1]):
print_statement += ' Checkpoint Saved'
checkpoint.save(checkpoint_path)
else:
print_statement += ' Checkpoint Saved'
checkpoint.save(checkpoint_path)
pbar.set_postfix_str(print_statement)
现在,当我通过预测测试数据集的 class 输出来评估测试准确度时,我获得了约 98% 的准确度(取决于检查点)。这大约比其他实现实现的少 1%。我真的不明白为什么会这样。下面,您可以看到模型的学习曲线。我希望有人可以提示为什么我的结果可能与其他人的代码不同。 Learning Curves
虽然没有详细查看您的代码,但在使用深度学习时,1% 的差异确实不多。差异可能是由不同的(随机)权重初始化或导致不同学习轨迹的略微不同的梯度引起的。 Re-training 网络可能每次都会导致略有不同的结果。