Keras 历史平均自定义损失函数
Keras historical averaging custom loss function
我目前正在 Keras 中试验生成对抗网络。
正如 this 论文中提出的,我想使用历史平均损失函数。这意味着我想惩罚网络权重的变化。
我不确定如何巧妙地实现它。
我正在根据 post 的答案实现自定义损失函数。
def historical_averaging_wrapper(current_weights, prev_weights):
def historical_averaging(y_true, y_pred):
diff = 0
for i in range(len(current_weights)):
diff += abs(np.sum(current_weights[i]) + np.sum(prev_weights[i]))
return K.binary_crossentropy(y_true, y_pred) + diff
return historical_averaging
对网络的权重进行惩罚,每批数据后权重都在变化
我的第一个想法是在每批之后更新损失函数。
大致是这样的:
prev_weights = model.get_weights()
for i in range(len(data)/batch_len):
current_weights = model.get_weights()
model.compile(loss=historical_averaging_wrapper(current_weights, prev_weights), optimizer='adam')
model.fit(training_data[i*batch_size:(i+1)*batch_size], training_labels[i*batch_size:(i+1)*batch_size], epochs=1, batch_size=batch_size)
prev_weights = current_weights
这样合理吗?这种做法在我看来似乎有点"messy"。
是否有另一种可能以 "smarter" 的方式做到这一点?
就像更新数据生成器中的损失函数并使用 fit_generator() 一样?
提前致谢。
损失函数是使用张量对图进行的运算。
您可以在损失函数中定义额外的张量来保存以前的值。这是一个例子:
import tensorflow as tf
import tensorflow.keras.backend as K
keras = tf.keras
class HistoricalAvgLoss(object):
def __init__(self, model):
# create tensors (initialized to zero) to hold the previous value of the
# weights
self.prev_weights = []
for w in model.get_weights():
self.prev_weights.append(K.variable(np.zeros(w.shape)))
def loss(self, y_true, y_pred):
err = keras.losses.mean_squared_error(y_true, y_pred)
werr = [K.mean(K.abs(c - p)) for c, p in zip(model.get_weights(), self.prev_weights)]
self.prev_weights = K.in_train_phase(
[K.update(p, c) for c, p in zip(model.get_weights(), self.prev_weights)],
self.prev_weights
)
return K.in_train_phase(err + K.sum(werr), err)
变量prev_weights
保存以前的值。请注意,我们在计算权重误差后添加了一个 K.update
操作。
用于测试的示例模型:
model = keras.models.Sequential([
keras.layers.Input(shape=(4,)),
keras.layers.Dense(8),
keras.layers.Dense(4),
keras.layers.Dense(1),
])
loss_obj = HistoricalAvgLoss(model)
model.compile('adam', loss_obj.loss)
model.summary()
一些测试数据和objective函数:
import numpy as np
def test_fn(x):
return x[0]*x[1] + 2.0 * x[1]**2 + x[2]/x[3] + 3.0 * x[3]
X = np.random.rand(1000, 4)
y = np.apply_along_axis(test_fn, 1, X)
hist = model.fit(X, y, validation_split=0.25, epochs=10)
在我的测试中,模型损失随时间减少。
我目前正在 Keras 中试验生成对抗网络。 正如 this 论文中提出的,我想使用历史平均损失函数。这意味着我想惩罚网络权重的变化。 我不确定如何巧妙地实现它。
我正在根据
def historical_averaging_wrapper(current_weights, prev_weights):
def historical_averaging(y_true, y_pred):
diff = 0
for i in range(len(current_weights)):
diff += abs(np.sum(current_weights[i]) + np.sum(prev_weights[i]))
return K.binary_crossentropy(y_true, y_pred) + diff
return historical_averaging
对网络的权重进行惩罚,每批数据后权重都在变化
我的第一个想法是在每批之后更新损失函数。 大致是这样的:
prev_weights = model.get_weights()
for i in range(len(data)/batch_len):
current_weights = model.get_weights()
model.compile(loss=historical_averaging_wrapper(current_weights, prev_weights), optimizer='adam')
model.fit(training_data[i*batch_size:(i+1)*batch_size], training_labels[i*batch_size:(i+1)*batch_size], epochs=1, batch_size=batch_size)
prev_weights = current_weights
这样合理吗?这种做法在我看来似乎有点"messy"。 是否有另一种可能以 "smarter" 的方式做到这一点? 就像更新数据生成器中的损失函数并使用 fit_generator() 一样? 提前致谢。
损失函数是使用张量对图进行的运算。 您可以在损失函数中定义额外的张量来保存以前的值。这是一个例子:
import tensorflow as tf
import tensorflow.keras.backend as K
keras = tf.keras
class HistoricalAvgLoss(object):
def __init__(self, model):
# create tensors (initialized to zero) to hold the previous value of the
# weights
self.prev_weights = []
for w in model.get_weights():
self.prev_weights.append(K.variable(np.zeros(w.shape)))
def loss(self, y_true, y_pred):
err = keras.losses.mean_squared_error(y_true, y_pred)
werr = [K.mean(K.abs(c - p)) for c, p in zip(model.get_weights(), self.prev_weights)]
self.prev_weights = K.in_train_phase(
[K.update(p, c) for c, p in zip(model.get_weights(), self.prev_weights)],
self.prev_weights
)
return K.in_train_phase(err + K.sum(werr), err)
变量prev_weights
保存以前的值。请注意,我们在计算权重误差后添加了一个 K.update
操作。
用于测试的示例模型:
model = keras.models.Sequential([
keras.layers.Input(shape=(4,)),
keras.layers.Dense(8),
keras.layers.Dense(4),
keras.layers.Dense(1),
])
loss_obj = HistoricalAvgLoss(model)
model.compile('adam', loss_obj.loss)
model.summary()
一些测试数据和objective函数:
import numpy as np
def test_fn(x):
return x[0]*x[1] + 2.0 * x[1]**2 + x[2]/x[3] + 3.0 * x[3]
X = np.random.rand(1000, 4)
y = np.apply_along_axis(test_fn, 1, X)
hist = model.fit(X, y, validation_split=0.25, epochs=10)
在我的测试中,模型损失随时间减少。