人工神经网络反向传播的全矩阵方法
Full-matrix approach to backpropagation in Artificial Neural Network
我最近正在学习人工神经网络 (ANN),并且有一个代码可以工作,并且 运行 在 Python 中基于小批量训练进行相同的编码。我遵循了 Michael Nilson's Neural Networks and Deep Learning 的书,其中为初学者逐步解释了每个算法。
还有一个完整的手写数字识别代码,对我来说也很好用。
但是,我正在尝试通过将整个小批量传递在一起以通过矩阵形式的反向传播进行训练来稍微调整代码。我也为此开发了一个工作代码,但代码在 运行 时执行得非常慢。有什么方法可以实现基于全矩阵的基于反向传播算法的网络小批量学习方法?
import numpy as np
import pandas as pd
class Network:
def __init__(self, sizes):
self.layers = len(sizes)
self.sizes = sizes
self.biases = [np.random.randn(y, 1) for y in sizes[1:]]
self.weights = [np.random.randn(y, x) for y, x in zip(sizes[1:], sizes[:-1])]
def feed_forward(self, a):
for w, b in zip(self.weights, self.biases):
a = sigmoid(np.dot(w,a) + b)
return a
# Calculate the cost derivative (Gradient of C w.r.t. 'a' - Nabla C(a))
def cost_derivative(self, output_activation, y):
return (output_activation - y)
def update_mini_batch(self, mini_batch, eta):
from scipy.linalg import block_diag
n = len(mini_batch)
xs = [x for x, y in mini_batch]
features = block_diag(*xs)
ys = [y for x, y in mini_batch]
responses = block_diag(*ys)
ws = [a for a in self.weights for i in xrange(n)]
new_list = []
k = 0
while (k < len(ws)):
new_list.append(ws[k: k + n])
k += n
weights = [block_diag(*elems) for elems in new_list]
bs = [b for b in self.biases for i in xrange(n)]
new_list2 = []
j = 0
while (j < len(bs)):
new_list2.append(bs[j : j + n])
j += n
biases = [block_diag(*elems) for elems in new_list2]
baises_dim_1 = [np.dot(np.ones((n*b.shape[0], b.shape[0])), b) for b in self.biases]
biases_dim_2 = [np.dot(b, np.ones((b.shape[1], n*b.shape[1]))) for b in baises_dim_1]
weights_dim_1 = [np.dot(np.ones((n*w.shape[0], w.shape[0])), w) for w in self.weights]
weights_dim_2 = [np.dot(w, np.ones((w.shape[1], n*w.shape[1]))) for w in weights_dim_1]
nabla_b = [np.zeros(b.shape) for b in biases_dim_2]
nabla_w = [np.zeros(w.shape) for w in weights_dim_2]
delta_b = [np.zeros(b.shape) for b in self.biases]
delta_w = [np.zeros(w.shape) for w in self.weights]
zs = []
activation = features
activations = [features]
for w, b in zip(weights, biases):
z = np.dot(w, activation) + b
zs.append(z)
activation = sigmoid(z)
activations.append(activation)
delta = self.cost_derivative(activations[-1], responses) * sigmoid_prime(zs[-1])
nabla_b[-1] = delta
nabla_w[-1] = np.dot(delta, activations[-2].transpose())
for l in xrange(2, self.layers):
z = zs[-l] # the weighted input for that layer
activation_prime = sigmoid_prime(z) # the derivative of activation for the layer
delta = np.dot(weights[-l + 1].transpose(), delta) * activation_prime # calculate the adjustment term (delta) for that layer
nabla_b[-l] = delta # calculate the bias adjustments - by means of using eq-BP3.
nabla_w[-l] = np.dot(delta, activations[-l-1].transpose()) # calculate the weight adjustments - by means of using eq-BP4.
delta_b = [self.split_cases(b, n) for b in nabla_b]
delta_w = [self.split_cases(w, n) for w in nabla_w]
self.weights = [w - (eta/n) * nw for w, nw in zip(self.weights, delta_w)]
self.biases = [b - (eta/ n) * nb for b, nb in zip(self.biases, delta_b)]
def split_cases(self, mat, mini_batch_size):
i = 0
j = 0
dim1 = mat.shape[0]/mini_batch_size
dim2 = mat.shape[1]/mini_batch_size
sum_samples = np.zeros((dim1, dim2))
while i < len(mat):
sum_samples = sum_samples + mat[i: i + dim1, j : j + dim2]
i += dim1
j += dim2
return sum_samples
"""Stochastic Gradient Descent for training in epochs"""
def SGD(self, training_data, epochs, mini_batch_size, eta, test_data = None):
n = len(training_data)
if test_data:
n_test = len(test_data)
for j in xrange(epochs):
np.random.shuffle(training_data) # for each epochs the mini-batches are selected randomly
mini_batches = [training_data[k: k+mini_batch_size] for k in xrange(0, n, mini_batch_size)] # select equal sizes of mini-batches for the epochs (last mini_batch size might differ however)
c = 1
for mini_batch in mini_batches:
print "Updating mini-batch {0}".format(c)
self.update_mini_batch(mini_batch, eta)
c += 1
if test_data:
print "Epoch {0}: {1}/{2}".format(j, self.evaluate(test_data), n_test)
else:
print "Epoch {0} completed.".format(j)
def evaluate(self, test_data):
test_results = [(np.argmax(self.feed_forward(x)), y) for (x, y) in test_data]
return (sum(int(x == y) for x, y in test_results))
def export_results(self, test_data):
results = [(np.argmax(self.feed_forward(x)), y) for (x, y) in test_data]
k = pd.DataFrame(results)
k.to_csv('net_results.csv')
# Global functions
## Activation function (sigmoid)
@np.vectorize
def sigmoid(z):
return 1.0/(1.0 + np.exp(-z))
## Activation derivative (sigmoid_prime)
@np.vectorize
def sigmoid_prime(z):
return sigmoid(z)*(1 - sigmoid(z))
这是我的代码。在我的机器上迭代 30 个 epoch 所花费的时间从 800+ 秒减少到 200+ 秒。
由于我是 python 的新手,所以我使用现成的东西。这段代码只需要 numpy 到 运行.
试一试。
def feedforward2(self, a):
zs = []
activations = [a]
activation = a
for b, w in zip(self.biases, self.weights):
z = np.dot(w, activation) + b
zs.append(z)
activation = sigmoid(z)
activations.append(activation)
return (zs, activations)
def update_mini_batch2(self, mini_batch, eta):
batch_size = len(mini_batch)
# transform to (input x batch_size) matrix
x = np.asarray([_x.ravel() for _x, _y in mini_batch]).transpose()
# transform to (output x batch_size) matrix
y = np.asarray([_y.ravel() for _x, _y in mini_batch]).transpose()
nabla_b, nabla_w = self.backprop2(x, y)
self.weights = [w - (eta / batch_size) * nw for w, nw in zip(self.weights, nabla_w)]
self.biases = [b - (eta / batch_size) * nb for b, nb in zip(self.biases, nabla_b)]
return
def backprop2(self, x, y):
nabla_b = [0 for i in self.biases]
nabla_w = [0 for i in self.weights]
# feedforward
zs, activations = self.feedforward2(x)
# backward pass
delta = self.cost_derivative(activations[-1], y) * sigmoid_prime(zs[-1])
nabla_b[-1] = delta.sum(1).reshape([len(delta), 1]) # reshape to (n x 1) matrix
nabla_w[-1] = np.dot(delta, activations[-2].transpose())
for l in xrange(2, self.num_layers):
z = zs[-l]
sp = sigmoid_prime(z)
delta = np.dot(self.weights[-l + 1].transpose(), delta) * sp
nabla_b[-l] = delta.sum(1).reshape([len(delta), 1]) # reshape to (n x 1) matrix
nabla_w[-l] = np.dot(delta, activations[-l - 1].transpose())
return (nabla_b, nabla_w)
我根据书上的原始代码,修改了一些小的代码。代码如下
import random
import numpy as np
class Network(object):
def __init__(self, sizes):
self.num_layers = len(sizes)
self.sizes = sizes
self.biases = [np.random.randn(y, 1) for y in sizes[1:]]
self.weights = [np.random.randn(y, x)
for x, y in zip(sizes[:-1], sizes[1:])]
def feedforward(self, a):
for b, w in zip(self.biases, self.weights):
a = sigmoid(np.dot(w, a) + b)
return a
def SGD(self, training_data, epochs, mini_batch_size, eta,
test_data=None):
training_data = list(training_data)
n = len(training_data)
if test_data:
test_data = list(test_data)
n_test = len(test_data)
for j in range(epochs):
random.shuffle(training_data)
mini_batches = [
training_data[k:k + mini_batch_size]
for k in range(0, n, mini_batch_size)]
for mini_batch in mini_batches:
self.update_mini_batch(mini_batch, eta)
if test_data:
print("Epoch {} : {} / {}".format(j, self.evaluate(test_data), n_test))
else:
print("Epoch {} complete".format(j))
def update_mini_batch(self, mini_batch, eta):
nabla_b = [np.zeros(b.shape) for b in self.biases]
nabla_w = [np.zeros(w.shape) for w in self.weights]
x_matrix_0 = [x for x, y in mini_batch]
y_matrix_0 = [y for x, y in mini_batch]
x_matrix = np.concatenate(x_matrix_0, axis=1)
y_matrix = np.concatenate(y_matrix_0, axis=1)
nabla_b, nabla_w = self.backprop(x_matrix, y_matrix)
self.weights = [w - (eta / len(mini_batch)) * nw
for w, nw in zip(self.weights, nabla_w)]
self.biases = [b - (eta / len(mini_batch)) * nb
for b, nb in zip(self.biases, nabla_b)]
def backprop(self, x, y):
nabla_b = [np.zeros(b.shape) for b in self.biases]
nabla_w = [np.zeros(w.shape) for w in self.weights]
# feedforward
activation = x
activations = [x] # list to store all the activations, layer by layer
zs = [] # list to store all the z vectors, layer by layer
for b, w in zip(self.biases, self.weights):
z = np.dot(w, activation) + np.kron(b, np.ones([1, y.shape[1]]))
zs.append(z)
activation = sigmoid(z)
activations.append(activation)
# backward pass
delta = self.cost_derivative(activations[-1], y) * sigmoid_prime(zs[-1])
nabla_b[-1] = np.reshape([np.sum(nb) for nb in delta], [delta.shape[0], 1])
for _d, _a in zip(delta.transpose(), activations[-2].transpose()):
_d = np.reshape(_d, [len(_d), 1])
_a = np.reshape(_a, [len(_a), 1])
nabla_w[-1] += np.dot(_d, _a.transpose())
for l in range(2, self.num_layers):
z = zs[-l]
sp = sigmoid_prime(z)
delta = np.dot(self.weights[-l + 1].transpose(), delta) * sp
nabla_b[-l] = np.reshape([np.sum(nb) for nb in delta], [delta.shape[0], 1])
for _d, _a in zip(delta.transpose(), activations[-l-1].transpose()):
_d = np.reshape(_d, [len(_d), 1])
_a = np.reshape(_a, [len(_a), 1])
nabla_w[-l] += np.dot(_d, _a.transpose())
return nabla_b, nabla_w
def cost_derivative(self, output_activations, y):
return (output_activations - y)
def sigmoid(z):
return 1.0 / (1.0 + np.exp(-z))
def sigmoid_prime(z):
return sigmoid(z) * (1 - sigmoid(z))
我最近正在学习人工神经网络 (ANN),并且有一个代码可以工作,并且 运行 在 Python 中基于小批量训练进行相同的编码。我遵循了 Michael Nilson's Neural Networks and Deep Learning 的书,其中为初学者逐步解释了每个算法。 还有一个完整的手写数字识别代码,对我来说也很好用。
但是,我正在尝试通过将整个小批量传递在一起以通过矩阵形式的反向传播进行训练来稍微调整代码。我也为此开发了一个工作代码,但代码在 运行 时执行得非常慢。有什么方法可以实现基于全矩阵的基于反向传播算法的网络小批量学习方法?
import numpy as np
import pandas as pd
class Network:
def __init__(self, sizes):
self.layers = len(sizes)
self.sizes = sizes
self.biases = [np.random.randn(y, 1) for y in sizes[1:]]
self.weights = [np.random.randn(y, x) for y, x in zip(sizes[1:], sizes[:-1])]
def feed_forward(self, a):
for w, b in zip(self.weights, self.biases):
a = sigmoid(np.dot(w,a) + b)
return a
# Calculate the cost derivative (Gradient of C w.r.t. 'a' - Nabla C(a))
def cost_derivative(self, output_activation, y):
return (output_activation - y)
def update_mini_batch(self, mini_batch, eta):
from scipy.linalg import block_diag
n = len(mini_batch)
xs = [x for x, y in mini_batch]
features = block_diag(*xs)
ys = [y for x, y in mini_batch]
responses = block_diag(*ys)
ws = [a for a in self.weights for i in xrange(n)]
new_list = []
k = 0
while (k < len(ws)):
new_list.append(ws[k: k + n])
k += n
weights = [block_diag(*elems) for elems in new_list]
bs = [b for b in self.biases for i in xrange(n)]
new_list2 = []
j = 0
while (j < len(bs)):
new_list2.append(bs[j : j + n])
j += n
biases = [block_diag(*elems) for elems in new_list2]
baises_dim_1 = [np.dot(np.ones((n*b.shape[0], b.shape[0])), b) for b in self.biases]
biases_dim_2 = [np.dot(b, np.ones((b.shape[1], n*b.shape[1]))) for b in baises_dim_1]
weights_dim_1 = [np.dot(np.ones((n*w.shape[0], w.shape[0])), w) for w in self.weights]
weights_dim_2 = [np.dot(w, np.ones((w.shape[1], n*w.shape[1]))) for w in weights_dim_1]
nabla_b = [np.zeros(b.shape) for b in biases_dim_2]
nabla_w = [np.zeros(w.shape) for w in weights_dim_2]
delta_b = [np.zeros(b.shape) for b in self.biases]
delta_w = [np.zeros(w.shape) for w in self.weights]
zs = []
activation = features
activations = [features]
for w, b in zip(weights, biases):
z = np.dot(w, activation) + b
zs.append(z)
activation = sigmoid(z)
activations.append(activation)
delta = self.cost_derivative(activations[-1], responses) * sigmoid_prime(zs[-1])
nabla_b[-1] = delta
nabla_w[-1] = np.dot(delta, activations[-2].transpose())
for l in xrange(2, self.layers):
z = zs[-l] # the weighted input for that layer
activation_prime = sigmoid_prime(z) # the derivative of activation for the layer
delta = np.dot(weights[-l + 1].transpose(), delta) * activation_prime # calculate the adjustment term (delta) for that layer
nabla_b[-l] = delta # calculate the bias adjustments - by means of using eq-BP3.
nabla_w[-l] = np.dot(delta, activations[-l-1].transpose()) # calculate the weight adjustments - by means of using eq-BP4.
delta_b = [self.split_cases(b, n) for b in nabla_b]
delta_w = [self.split_cases(w, n) for w in nabla_w]
self.weights = [w - (eta/n) * nw for w, nw in zip(self.weights, delta_w)]
self.biases = [b - (eta/ n) * nb for b, nb in zip(self.biases, delta_b)]
def split_cases(self, mat, mini_batch_size):
i = 0
j = 0
dim1 = mat.shape[0]/mini_batch_size
dim2 = mat.shape[1]/mini_batch_size
sum_samples = np.zeros((dim1, dim2))
while i < len(mat):
sum_samples = sum_samples + mat[i: i + dim1, j : j + dim2]
i += dim1
j += dim2
return sum_samples
"""Stochastic Gradient Descent for training in epochs"""
def SGD(self, training_data, epochs, mini_batch_size, eta, test_data = None):
n = len(training_data)
if test_data:
n_test = len(test_data)
for j in xrange(epochs):
np.random.shuffle(training_data) # for each epochs the mini-batches are selected randomly
mini_batches = [training_data[k: k+mini_batch_size] for k in xrange(0, n, mini_batch_size)] # select equal sizes of mini-batches for the epochs (last mini_batch size might differ however)
c = 1
for mini_batch in mini_batches:
print "Updating mini-batch {0}".format(c)
self.update_mini_batch(mini_batch, eta)
c += 1
if test_data:
print "Epoch {0}: {1}/{2}".format(j, self.evaluate(test_data), n_test)
else:
print "Epoch {0} completed.".format(j)
def evaluate(self, test_data):
test_results = [(np.argmax(self.feed_forward(x)), y) for (x, y) in test_data]
return (sum(int(x == y) for x, y in test_results))
def export_results(self, test_data):
results = [(np.argmax(self.feed_forward(x)), y) for (x, y) in test_data]
k = pd.DataFrame(results)
k.to_csv('net_results.csv')
# Global functions
## Activation function (sigmoid)
@np.vectorize
def sigmoid(z):
return 1.0/(1.0 + np.exp(-z))
## Activation derivative (sigmoid_prime)
@np.vectorize
def sigmoid_prime(z):
return sigmoid(z)*(1 - sigmoid(z))
这是我的代码。在我的机器上迭代 30 个 epoch 所花费的时间从 800+ 秒减少到 200+ 秒。
由于我是 python 的新手,所以我使用现成的东西。这段代码只需要 numpy 到 运行.
试一试。
def feedforward2(self, a):
zs = []
activations = [a]
activation = a
for b, w in zip(self.biases, self.weights):
z = np.dot(w, activation) + b
zs.append(z)
activation = sigmoid(z)
activations.append(activation)
return (zs, activations)
def update_mini_batch2(self, mini_batch, eta):
batch_size = len(mini_batch)
# transform to (input x batch_size) matrix
x = np.asarray([_x.ravel() for _x, _y in mini_batch]).transpose()
# transform to (output x batch_size) matrix
y = np.asarray([_y.ravel() for _x, _y in mini_batch]).transpose()
nabla_b, nabla_w = self.backprop2(x, y)
self.weights = [w - (eta / batch_size) * nw for w, nw in zip(self.weights, nabla_w)]
self.biases = [b - (eta / batch_size) * nb for b, nb in zip(self.biases, nabla_b)]
return
def backprop2(self, x, y):
nabla_b = [0 for i in self.biases]
nabla_w = [0 for i in self.weights]
# feedforward
zs, activations = self.feedforward2(x)
# backward pass
delta = self.cost_derivative(activations[-1], y) * sigmoid_prime(zs[-1])
nabla_b[-1] = delta.sum(1).reshape([len(delta), 1]) # reshape to (n x 1) matrix
nabla_w[-1] = np.dot(delta, activations[-2].transpose())
for l in xrange(2, self.num_layers):
z = zs[-l]
sp = sigmoid_prime(z)
delta = np.dot(self.weights[-l + 1].transpose(), delta) * sp
nabla_b[-l] = delta.sum(1).reshape([len(delta), 1]) # reshape to (n x 1) matrix
nabla_w[-l] = np.dot(delta, activations[-l - 1].transpose())
return (nabla_b, nabla_w)
我根据书上的原始代码,修改了一些小的代码。代码如下
import random
import numpy as np
class Network(object):
def __init__(self, sizes):
self.num_layers = len(sizes)
self.sizes = sizes
self.biases = [np.random.randn(y, 1) for y in sizes[1:]]
self.weights = [np.random.randn(y, x)
for x, y in zip(sizes[:-1], sizes[1:])]
def feedforward(self, a):
for b, w in zip(self.biases, self.weights):
a = sigmoid(np.dot(w, a) + b)
return a
def SGD(self, training_data, epochs, mini_batch_size, eta,
test_data=None):
training_data = list(training_data)
n = len(training_data)
if test_data:
test_data = list(test_data)
n_test = len(test_data)
for j in range(epochs):
random.shuffle(training_data)
mini_batches = [
training_data[k:k + mini_batch_size]
for k in range(0, n, mini_batch_size)]
for mini_batch in mini_batches:
self.update_mini_batch(mini_batch, eta)
if test_data:
print("Epoch {} : {} / {}".format(j, self.evaluate(test_data), n_test))
else:
print("Epoch {} complete".format(j))
def update_mini_batch(self, mini_batch, eta):
nabla_b = [np.zeros(b.shape) for b in self.biases]
nabla_w = [np.zeros(w.shape) for w in self.weights]
x_matrix_0 = [x for x, y in mini_batch]
y_matrix_0 = [y for x, y in mini_batch]
x_matrix = np.concatenate(x_matrix_0, axis=1)
y_matrix = np.concatenate(y_matrix_0, axis=1)
nabla_b, nabla_w = self.backprop(x_matrix, y_matrix)
self.weights = [w - (eta / len(mini_batch)) * nw
for w, nw in zip(self.weights, nabla_w)]
self.biases = [b - (eta / len(mini_batch)) * nb
for b, nb in zip(self.biases, nabla_b)]
def backprop(self, x, y):
nabla_b = [np.zeros(b.shape) for b in self.biases]
nabla_w = [np.zeros(w.shape) for w in self.weights]
# feedforward
activation = x
activations = [x] # list to store all the activations, layer by layer
zs = [] # list to store all the z vectors, layer by layer
for b, w in zip(self.biases, self.weights):
z = np.dot(w, activation) + np.kron(b, np.ones([1, y.shape[1]]))
zs.append(z)
activation = sigmoid(z)
activations.append(activation)
# backward pass
delta = self.cost_derivative(activations[-1], y) * sigmoid_prime(zs[-1])
nabla_b[-1] = np.reshape([np.sum(nb) for nb in delta], [delta.shape[0], 1])
for _d, _a in zip(delta.transpose(), activations[-2].transpose()):
_d = np.reshape(_d, [len(_d), 1])
_a = np.reshape(_a, [len(_a), 1])
nabla_w[-1] += np.dot(_d, _a.transpose())
for l in range(2, self.num_layers):
z = zs[-l]
sp = sigmoid_prime(z)
delta = np.dot(self.weights[-l + 1].transpose(), delta) * sp
nabla_b[-l] = np.reshape([np.sum(nb) for nb in delta], [delta.shape[0], 1])
for _d, _a in zip(delta.transpose(), activations[-l-1].transpose()):
_d = np.reshape(_d, [len(_d), 1])
_a = np.reshape(_a, [len(_a), 1])
nabla_w[-l] += np.dot(_d, _a.transpose())
return nabla_b, nabla_w
def cost_derivative(self, output_activations, y):
return (output_activations - y)
def sigmoid(z):
return 1.0 / (1.0 + np.exp(-z))
def sigmoid_prime(z):
return sigmoid(z) * (1 - sigmoid(z))