Keras 中带有权重的自定义损失函数
Custom loss function with weights in Keras
我是神经网络的新手。我想在TensorFlow中做一个自定义的损失函数,但是我需要得到一个权重向量,所以我是这样做的:
def my_loss(weights):
def custom_loss(y, y_pred):
return weights*(y - y_pred)
return custom_loss
model.compile(optimizer='adam', loss=my_loss(weights), metrics=['accuracy'])
model.fit(x_train, y_train, batch_size=None, validation_data=(x_test, y_test), epochs=100)
当我启动它时,我收到这个错误:
InvalidArgumentError: Incompatible shapes: [50000,10] vs. [32,10]
形状是:
print(weights.shape)
print(y_train.shape)
(50000, 10)
(50000, 10)
所以我认为这是批处理的问题,我没有很强的 TensorFlow 背景,所以我尝试使用全局变量以一种天真的方式解决
batch_index = 0
然后在自定义回调中将其更新到 "on_batch_begin" 挂钩中。但它没有用,这是一个可怕的解决方案。那么,我怎样才能得到权重的确切部分与相应的 y 呢?我有办法在自定义损失中获取当前批次索引吗?
预先感谢您的帮助
Keras 允许您从全局范围获取任何张量。实际上,y_true
和y_pred
甚至可能都用不到,as here。
您的模型可以有多个输入(您可以在推理时将此输入设为虚拟输入,或将权重加载到具有单个输入的模型中)。请注意,您仍然需要它进行验证。
import keras
from keras.layers import *
from keras import backend as K
import numpy as np
inputs_x = Input(shape=(10,))
inputs_w = Input(shape=(10,))
y = Dense(10,kernel_initializer='glorot_uniform' )(inputs_x)
model = keras.Model(inputs=[inputs_x, inputs_w], outputs=[y])
def my_loss(y_true, y_pred):
return K.abs((y_true-y_pred)*inputs_w)
def my_metrics(y_true, y_pred):
# just to output something
return K.mean(inputs_w)
model.compile(optimizer='adam', loss=[my_loss], metrics=[my_metrics])
data = np.random.normal(size=(50000, 10))
labels = np.random.normal(size=(50000, 10))
weights = np.random.normal(size=(50000, 10))
model.fit([data, weights], labels, batch_size=256, validation_data=([data[:100], weights[:100]], labels[:100]), epochs=100)
要在没有权重的情况下进行验证,您需要编译另一个不使用权重的具有不同损失的模型版本。
UPD:另请注意,如果 Keras 是 returns 数组而不是标量
,则 Keras 将总结您损失的所有元素
更新:Tor tensorflow 2.1.0 似乎变得更复杂了。方法是按照@marco-cerliani 指出的方向(标签、权重和数据被提供给模型,自定义损失张量通过 .add_loss()
添加),但是他的解决方案对我不起作用盒子。第一件事是模型不想使用 None 损失,拒绝接受输入和输出。所以,我引入了额外的虚拟损失函数。当数据集大小不能被批量大小整除时,第二个问题出现了。在 keras 和 tf 1.x 中,最后一批问题通常由 steps_per_epoch
和 validation_steps
参数解决,但这里如果在 Epoch 2 的第一批开始失败。所以我需要进行简单的自定义数据生成器。
import tensorflow.keras as keras
from tensorflow.keras.layers import *
from tensorflow.keras import backend as K
import numpy as np
inputs_x = Input(shape=(10,))
inputs_w = Input(shape=(10,))
inputs_l = Input(shape=(10,))
y = Dense(10,kernel_initializer='glorot_uniform' )(inputs_x)
model = keras.Model(inputs=[inputs_x, inputs_w, inputs_l], outputs=[y])
def my_loss(y_true, y_pred):
return K.abs((y_true-y_pred)*inputs_w)
def my_metrics():
# just to output something
return K.mean(inputs_w)
def dummy_loss(y_true, y_pred):
return 0.
loss = my_loss(y, inputs_l)
metric = my_metrics()
model.add_loss(loss)
model.add_metric(metric, name='my_metric', aggregation='mean')
model.compile(optimizer='adam', loss=dummy_loss)
data = np.random.normal(size=(50000, 10))
labels = np.random.normal(size=(50000, 10))
weights = np.random.normal(size=(50000, 10))
dummy = np.zeros(shape=(50000, 10)) # or in can be labels, no matter now
# looks like it does not like when len(data) % batch_size != 0
# If I set steps_per_epoch, it fails on the second epoch.
# So, I proceded with data generator
class DataGenerator(keras.utils.Sequence):
'Generates data for Keras'
def __init__(self, x, w, y, y2, batch_size, shuffle=True):
'Initialization'
self.x = x
self.w = w
self.y = y
self.y2 = y2
self.indices = list(range(len(self.x)))
self.shuffle = shuffle
self.batch_size = batch_size
self.on_epoch_end()
def __len__(self):
'Denotes the number of batches per epoch'
return len(self.indices) // self.batch_size
def __getitem__(self, index):
'Generate one batch of data'
# Generate indexes of the batch
ids = self.indices[index*self.batch_size:(index+1)*self.batch_size]
# the last None to remove weird warning
#
return [self.x[ids], self.w[ids], self.y[ids]], self.y2[ids], [None]
def on_epoch_end(self):
'Updates indexes after each epoch'
if self.shuffle == True:
np.random.shuffle(self.indices)
batch_size = 256
train_generator = DataGenerator(data,weights,labels, dummy, batch_size=batch_size, shuffle=True)
val_generator = DataGenerator(data[:2*batch_size],weights[:2*batch_size],labels[:2*batch_size], dummy[:2*batch_size], batch_size=batch_size, shuffle=True)
model.fit(x=train_generator, validation_data=val_generator,epochs=100)
这是一种将附加参数传递给自定义损失函数的变通方法,在您的例子中是一个权重数组。诀窍在于使用虚假输入,这些输入有助于以正确的方式构建和使用损失。不要忘记 keras 处理固定的批次维度
我在回归问题中提供了一个虚拟示例
def mse(y_true, y_pred, weights):
error = y_true-y_pred
return K.mean(K.square(error) + K.sqrt(weights))
X = np.random.uniform(0,1, (1000,10))
y = np.random.uniform(0,1, 1000)
w = np.random.uniform(0,1, 1000)
inp = Input((10,))
true = Input((1,))
weights = Input((1,))
x = Dense(32, activation='relu')(inp)
out = Dense(1)(x)
m = Model([inp,true,weights], out)
m.add_loss( mse( true, out, weights ) )
m.compile(loss=None, optimizer='adam')
m.fit(x=[X, y, w], y=None, epochs=3)
## final fitted model to compute predictions (remove W if not needed)
final_m = Model(inp, out)
像@Michael Moretti 一样,我也是新手(深度学习、Python、TensorFlow、Keras 等)。这个问题是大约 19 个月前提出的,在“TF 年”中事情发展得很快。
显然在某些时候,您可以编写一个带有参数 (y_true, y_pred)
的 Python 函数并将其传递给您对 model.compile()
的调用,一切都很好。现在这似乎在一些简单的情况下有效,但在一般情况下无效。在试图理解为什么它对我不起作用时,我发现了这个 SO 问题和其他相关问题。这是@M.Innat对 that got me on the right track. But in fact his relevant final example CustomMSE
is cribbed from the Keras Guide section on Custom Losses的回答。此示例展示了如何编写与 TensorFlow 版本完全兼容的自定义损失:2.7.0,以及如何通过 class 的构造函数向其传递附加参数基于 keras.losses.Loss
调用 model.compile()
:
class CustomMSE(keras.losses.Loss):
def __init__(self, regularization_factor=0.1, name="custom_mse"):
super().__init__(name=name)
self.regularization_factor = regularization_factor
def call(self, y_true, y_pred):
mse = tf.math.reduce_mean(tf.square(y_true - y_pred))
reg = tf.math.reduce_mean(tf.square(0.5 - y_pred))
return mse + reg * self.regularization_factor
model.compile(optimizer=keras.optimizers.Adam(), loss=CustomMSE())
为获得最佳结果,请确保自定义损失函数内的所有计算(即自定义损失 class 的 call()
方法)均使用 TensorFlow 运算符完成,并且所有输入输出数据表示为 TF 张量。
我是神经网络的新手。我想在TensorFlow中做一个自定义的损失函数,但是我需要得到一个权重向量,所以我是这样做的:
def my_loss(weights):
def custom_loss(y, y_pred):
return weights*(y - y_pred)
return custom_loss
model.compile(optimizer='adam', loss=my_loss(weights), metrics=['accuracy'])
model.fit(x_train, y_train, batch_size=None, validation_data=(x_test, y_test), epochs=100)
当我启动它时,我收到这个错误:
InvalidArgumentError: Incompatible shapes: [50000,10] vs. [32,10]
形状是:
print(weights.shape)
print(y_train.shape)
(50000, 10)
(50000, 10)
所以我认为这是批处理的问题,我没有很强的 TensorFlow 背景,所以我尝试使用全局变量以一种天真的方式解决
batch_index = 0
然后在自定义回调中将其更新到 "on_batch_begin" 挂钩中。但它没有用,这是一个可怕的解决方案。那么,我怎样才能得到权重的确切部分与相应的 y 呢?我有办法在自定义损失中获取当前批次索引吗? 预先感谢您的帮助
Keras 允许您从全局范围获取任何张量。实际上,y_true
和y_pred
甚至可能都用不到,as here。
您的模型可以有多个输入(您可以在推理时将此输入设为虚拟输入,或将权重加载到具有单个输入的模型中)。请注意,您仍然需要它进行验证。
import keras
from keras.layers import *
from keras import backend as K
import numpy as np
inputs_x = Input(shape=(10,))
inputs_w = Input(shape=(10,))
y = Dense(10,kernel_initializer='glorot_uniform' )(inputs_x)
model = keras.Model(inputs=[inputs_x, inputs_w], outputs=[y])
def my_loss(y_true, y_pred):
return K.abs((y_true-y_pred)*inputs_w)
def my_metrics(y_true, y_pred):
# just to output something
return K.mean(inputs_w)
model.compile(optimizer='adam', loss=[my_loss], metrics=[my_metrics])
data = np.random.normal(size=(50000, 10))
labels = np.random.normal(size=(50000, 10))
weights = np.random.normal(size=(50000, 10))
model.fit([data, weights], labels, batch_size=256, validation_data=([data[:100], weights[:100]], labels[:100]), epochs=100)
要在没有权重的情况下进行验证,您需要编译另一个不使用权重的具有不同损失的模型版本。
UPD:另请注意,如果 Keras 是 returns 数组而不是标量
,则 Keras 将总结您损失的所有元素更新:Tor tensorflow 2.1.0 似乎变得更复杂了。方法是按照@marco-cerliani 指出的方向(标签、权重和数据被提供给模型,自定义损失张量通过 .add_loss()
添加),但是他的解决方案对我不起作用盒子。第一件事是模型不想使用 None 损失,拒绝接受输入和输出。所以,我引入了额外的虚拟损失函数。当数据集大小不能被批量大小整除时,第二个问题出现了。在 keras 和 tf 1.x 中,最后一批问题通常由 steps_per_epoch
和 validation_steps
参数解决,但这里如果在 Epoch 2 的第一批开始失败。所以我需要进行简单的自定义数据生成器。
import tensorflow.keras as keras
from tensorflow.keras.layers import *
from tensorflow.keras import backend as K
import numpy as np
inputs_x = Input(shape=(10,))
inputs_w = Input(shape=(10,))
inputs_l = Input(shape=(10,))
y = Dense(10,kernel_initializer='glorot_uniform' )(inputs_x)
model = keras.Model(inputs=[inputs_x, inputs_w, inputs_l], outputs=[y])
def my_loss(y_true, y_pred):
return K.abs((y_true-y_pred)*inputs_w)
def my_metrics():
# just to output something
return K.mean(inputs_w)
def dummy_loss(y_true, y_pred):
return 0.
loss = my_loss(y, inputs_l)
metric = my_metrics()
model.add_loss(loss)
model.add_metric(metric, name='my_metric', aggregation='mean')
model.compile(optimizer='adam', loss=dummy_loss)
data = np.random.normal(size=(50000, 10))
labels = np.random.normal(size=(50000, 10))
weights = np.random.normal(size=(50000, 10))
dummy = np.zeros(shape=(50000, 10)) # or in can be labels, no matter now
# looks like it does not like when len(data) % batch_size != 0
# If I set steps_per_epoch, it fails on the second epoch.
# So, I proceded with data generator
class DataGenerator(keras.utils.Sequence):
'Generates data for Keras'
def __init__(self, x, w, y, y2, batch_size, shuffle=True):
'Initialization'
self.x = x
self.w = w
self.y = y
self.y2 = y2
self.indices = list(range(len(self.x)))
self.shuffle = shuffle
self.batch_size = batch_size
self.on_epoch_end()
def __len__(self):
'Denotes the number of batches per epoch'
return len(self.indices) // self.batch_size
def __getitem__(self, index):
'Generate one batch of data'
# Generate indexes of the batch
ids = self.indices[index*self.batch_size:(index+1)*self.batch_size]
# the last None to remove weird warning
#
return [self.x[ids], self.w[ids], self.y[ids]], self.y2[ids], [None]
def on_epoch_end(self):
'Updates indexes after each epoch'
if self.shuffle == True:
np.random.shuffle(self.indices)
batch_size = 256
train_generator = DataGenerator(data,weights,labels, dummy, batch_size=batch_size, shuffle=True)
val_generator = DataGenerator(data[:2*batch_size],weights[:2*batch_size],labels[:2*batch_size], dummy[:2*batch_size], batch_size=batch_size, shuffle=True)
model.fit(x=train_generator, validation_data=val_generator,epochs=100)
这是一种将附加参数传递给自定义损失函数的变通方法,在您的例子中是一个权重数组。诀窍在于使用虚假输入,这些输入有助于以正确的方式构建和使用损失。不要忘记 keras 处理固定的批次维度
我在回归问题中提供了一个虚拟示例
def mse(y_true, y_pred, weights):
error = y_true-y_pred
return K.mean(K.square(error) + K.sqrt(weights))
X = np.random.uniform(0,1, (1000,10))
y = np.random.uniform(0,1, 1000)
w = np.random.uniform(0,1, 1000)
inp = Input((10,))
true = Input((1,))
weights = Input((1,))
x = Dense(32, activation='relu')(inp)
out = Dense(1)(x)
m = Model([inp,true,weights], out)
m.add_loss( mse( true, out, weights ) )
m.compile(loss=None, optimizer='adam')
m.fit(x=[X, y, w], y=None, epochs=3)
## final fitted model to compute predictions (remove W if not needed)
final_m = Model(inp, out)
像@Michael Moretti 一样,我也是新手(深度学习、Python、TensorFlow、Keras 等)。这个问题是大约 19 个月前提出的,在“TF 年”中事情发展得很快。
显然在某些时候,您可以编写一个带有参数 (y_true, y_pred)
的 Python 函数并将其传递给您对 model.compile()
的调用,一切都很好。现在这似乎在一些简单的情况下有效,但在一般情况下无效。在试图理解为什么它对我不起作用时,我发现了这个 SO 问题和其他相关问题。这是@M.Innat对CustomMSE
is cribbed from the Keras Guide section on Custom Losses的回答。此示例展示了如何编写与 TensorFlow 版本完全兼容的自定义损失:2.7.0,以及如何通过 class 的构造函数向其传递附加参数基于 keras.losses.Loss
调用 model.compile()
:
class CustomMSE(keras.losses.Loss):
def __init__(self, regularization_factor=0.1, name="custom_mse"):
super().__init__(name=name)
self.regularization_factor = regularization_factor
def call(self, y_true, y_pred):
mse = tf.math.reduce_mean(tf.square(y_true - y_pred))
reg = tf.math.reduce_mean(tf.square(0.5 - y_pred))
return mse + reg * self.regularization_factor
model.compile(optimizer=keras.optimizers.Adam(), loss=CustomMSE())
为获得最佳结果,请确保自定义损失函数内的所有计算(即自定义损失 class 的 call()
方法)均使用 TensorFlow 运算符完成,并且所有输入输出数据表示为 TF 张量。