keras自定义层中的持久变量
Persistent Variable in keras Custom Layer
我想写一个自定义层,我可以在 运行 秒之间在内存中保存一个变量。
例如,
class MyLayer(Layer):
def __init__(self, out_dim = 51, **kwargs):
self.out_dim = out_dim
super(MyLayer, self).__init__(**kwargs)
def build(self, input_shape):
a = 0.0
self.persistent_variable = K.variable(a)
self.built = True
def get_output_shape_for(self, input_shape):
return (input_shape[0], 1)
def call(self, x, mask=None):
a = K.eval(self.persistent_variable) + 1
K.set_value(self.persistent_variable, a)
return self.persistent_variable
m = Sequential()
m.add(MyLayer(input_shape=(1,)))
当我 运行 m.predict
时,我希望 persistent_variable
得到更新,并打印增量值。
但它看起来总是打印 0
# Dummy input
x = np.zeros(1)
m.predict(x, batch_size=1)
我的问题是,如何在 m.predict
的每个 运行 之后增加并保存 persistent_variable
谢谢,
纳文
诀窍是你必须在调用函数中调用 self.add_update(...)
来注册一个函数,每次评估模型时都会调用该函数(我通过深入研究有状态 rnns 的源代码发现了这一点).如果你这样做 self.stateful = True
它会为每次训练和预测调用调用你的自定义更新函数,否则它只会在训练期间调用它。例如:
import keras.backend as K
import numpy as np
from keras.engine.topology import Layer
class CounterLayer(Layer):
def __init__(self, stateful=False,**kwargs):
self.stateful = stateful # True means it will increment counter on predict and train, false means it will only increment counter on train
super(CounterLayer, self).__init__(**kwargs)
def build(self, input_shape):
# Define variables in build
self.count = K.variable(0, name="count")
super(CounterLayer, self).build(input_shape)
def call(self, x, mask=None):
updates = []
# The format is (variable, value setting to)
# So this says
# self.pos = self.pos + 1
updates.append((self.count, self.count+1))
# You can append more updates to this list or call add_update more
# times if you want
# Add our custom update
# We stick x here so it calls our update function every time our layer
# is given a new x
self.add_update(updates, x)
# This will be an identity layer but keras gets mad for some reason
# if you just output x so we'll multiply it by 1 so it thinks it is a
# "new variable"
return self.count
# in newer keras versions you might need to name this compute_output_shape instead
def get_output_shape_for(self, input_shape):
# We will just return our count as an array ([[count]])
return (1,1)
def reset_states(self):
self.count.set_value(0)
用法示例:
from keras.layers import Input
from keras.models import Model
from keras.optimizers import RMSprop
inputLayer = Input(shape=(10,))
counter = CounterLayer() # Don't update on predict
# counter = CounterLayer(stateful=True) # This will update each time you call predict
counterLayer = counter(inputLayer)
model = Model(input=inputLayer, output=counterLayer)
optimizer = RMSprop(lr=0.001)
model.compile(loss="mse", optimizer=optimizer)
# See the value of our counter
print counter.count.get_value()
# This won't actually train anything but each epoch will update our counter
# Note that if you say have a batch size of 5, update will be called 5 times per epoch
model.fit(np.zeros([1, 10]), np.array([0]), batch_size=1, nb_epoch=5)
# The value of our counter has now changed
print counter.count.get_value()
model.predict(np.zeros([1, 10]))
# If we did stateful=False, this didn't change, otherwise it did
print counter.count.get_value()
需要使用 tf_state_ops.assign()
或 tf.compat.v1.scatter_update()
来实现此功能。
下面是一个使用 tf_state_ops.assign()
.
的例子
import tensorflow as tf
import tensorflow.keras.layers as KL
import tensorflow_probability as tfp
from tensorflow.python.ops import state_ops as tf_state_ops
class CustomLayer(KL.Layer):
"""custom layer for storing moving average of nth percentile of some values"""
def __init__(
self,
percentile: float = 66.67,
name: str = "thresh",
alpha: float = 0.9,
moving_thresh_initializer: float = 0.0,
**kwargs
):
"""Layer initialization
Args:
percentile (float, optional): percentile for thresholding. Defaults to 66.67.
name (str, optional): name for the tensor. Defaults to "thresh".
alpha (float, optional): decay value for moving average. Defaults to 0.9.
moving_thresh_initializer (float, optional): Initial threshold. Defaults to 0.0
"""
super().__init__(trainable=False, name=name, **kwargs)
self.percentile = percentile
self.moving_thresh_initializer = tf.constant_initializer(
value=moving_thresh_initializer
)
self.alpha = alpha
def build(self, input_shape):
"""build the layer"""
shape = ()
self.moving_thresh = self.add_weight(
shape=shape,
name="moving_thresh",
initializer=self.moving_thresh_initializer,
trainable=False,
)
return super().build(input_shape)
def call(self, inputs: tf.Tensor) -> tf.Tensor:
"""call method on the layer
Args:
inputs (tf.Tensor): samplewise values for a given batch
Returns:
tf.Tensor (shape = ()): threshold value
"""
batch_thresh = tfp.stats.percentile(
inputs, q=self.percentile, axis=[0], interpolation="linear"
)
self.moving_thresh = tf_state_ops.assign(
self.moving_thresh,
self.alpha * self.moving_thresh + (1.0 - self.alpha) * batch_loss_thresh,
# use_locking=self._use_locking,
)
return self.moving_thresh
def get_config(self) -> dict:
"""Setting up the layer config
Returns:
dict: config key-value pairs
"""
base_config = super().get_config()
config = {
"alpha": self.alpha,
"moving_thresh_initializer": self.moving_thresh_initializer,
"percentile": self.percentile,
"threshhold": self.moving_thresh,
}
return dict(list(base_config.items()) + list(config.items()))
def compute_output_shape(self, input_shape: tuple) -> tuple:
"""shape of the layer output"""
return ()
以上自定义层可以按如下方式包含在工作流中:
thresholding_layer = CustomLayer()
# Dummy input
x = np.zeros((batch_size, 1))
current_threshold = thresholding_layer(x)
有关使用上述自定义层的更多详细信息以及 tf.compat.v1.scatter_update()
的用法,您可以查看以下内容 link。
https://medium.com/dive-into-ml-ai/custom-layer-with-memory-in-keras-1d0c03e722e9
我想写一个自定义层,我可以在 运行 秒之间在内存中保存一个变量。 例如,
class MyLayer(Layer):
def __init__(self, out_dim = 51, **kwargs):
self.out_dim = out_dim
super(MyLayer, self).__init__(**kwargs)
def build(self, input_shape):
a = 0.0
self.persistent_variable = K.variable(a)
self.built = True
def get_output_shape_for(self, input_shape):
return (input_shape[0], 1)
def call(self, x, mask=None):
a = K.eval(self.persistent_variable) + 1
K.set_value(self.persistent_variable, a)
return self.persistent_variable
m = Sequential()
m.add(MyLayer(input_shape=(1,)))
当我 运行 m.predict
时,我希望 persistent_variable
得到更新,并打印增量值。
但它看起来总是打印 0
# Dummy input
x = np.zeros(1)
m.predict(x, batch_size=1)
我的问题是,如何在 m.predict
persistent_variable
谢谢, 纳文
诀窍是你必须在调用函数中调用 self.add_update(...)
来注册一个函数,每次评估模型时都会调用该函数(我通过深入研究有状态 rnns 的源代码发现了这一点).如果你这样做 self.stateful = True
它会为每次训练和预测调用调用你的自定义更新函数,否则它只会在训练期间调用它。例如:
import keras.backend as K
import numpy as np
from keras.engine.topology import Layer
class CounterLayer(Layer):
def __init__(self, stateful=False,**kwargs):
self.stateful = stateful # True means it will increment counter on predict and train, false means it will only increment counter on train
super(CounterLayer, self).__init__(**kwargs)
def build(self, input_shape):
# Define variables in build
self.count = K.variable(0, name="count")
super(CounterLayer, self).build(input_shape)
def call(self, x, mask=None):
updates = []
# The format is (variable, value setting to)
# So this says
# self.pos = self.pos + 1
updates.append((self.count, self.count+1))
# You can append more updates to this list or call add_update more
# times if you want
# Add our custom update
# We stick x here so it calls our update function every time our layer
# is given a new x
self.add_update(updates, x)
# This will be an identity layer but keras gets mad for some reason
# if you just output x so we'll multiply it by 1 so it thinks it is a
# "new variable"
return self.count
# in newer keras versions you might need to name this compute_output_shape instead
def get_output_shape_for(self, input_shape):
# We will just return our count as an array ([[count]])
return (1,1)
def reset_states(self):
self.count.set_value(0)
用法示例:
from keras.layers import Input
from keras.models import Model
from keras.optimizers import RMSprop
inputLayer = Input(shape=(10,))
counter = CounterLayer() # Don't update on predict
# counter = CounterLayer(stateful=True) # This will update each time you call predict
counterLayer = counter(inputLayer)
model = Model(input=inputLayer, output=counterLayer)
optimizer = RMSprop(lr=0.001)
model.compile(loss="mse", optimizer=optimizer)
# See the value of our counter
print counter.count.get_value()
# This won't actually train anything but each epoch will update our counter
# Note that if you say have a batch size of 5, update will be called 5 times per epoch
model.fit(np.zeros([1, 10]), np.array([0]), batch_size=1, nb_epoch=5)
# The value of our counter has now changed
print counter.count.get_value()
model.predict(np.zeros([1, 10]))
# If we did stateful=False, this didn't change, otherwise it did
print counter.count.get_value()
需要使用 tf_state_ops.assign()
或 tf.compat.v1.scatter_update()
来实现此功能。
下面是一个使用 tf_state_ops.assign()
.
import tensorflow as tf
import tensorflow.keras.layers as KL
import tensorflow_probability as tfp
from tensorflow.python.ops import state_ops as tf_state_ops
class CustomLayer(KL.Layer):
"""custom layer for storing moving average of nth percentile of some values"""
def __init__(
self,
percentile: float = 66.67,
name: str = "thresh",
alpha: float = 0.9,
moving_thresh_initializer: float = 0.0,
**kwargs
):
"""Layer initialization
Args:
percentile (float, optional): percentile for thresholding. Defaults to 66.67.
name (str, optional): name for the tensor. Defaults to "thresh".
alpha (float, optional): decay value for moving average. Defaults to 0.9.
moving_thresh_initializer (float, optional): Initial threshold. Defaults to 0.0
"""
super().__init__(trainable=False, name=name, **kwargs)
self.percentile = percentile
self.moving_thresh_initializer = tf.constant_initializer(
value=moving_thresh_initializer
)
self.alpha = alpha
def build(self, input_shape):
"""build the layer"""
shape = ()
self.moving_thresh = self.add_weight(
shape=shape,
name="moving_thresh",
initializer=self.moving_thresh_initializer,
trainable=False,
)
return super().build(input_shape)
def call(self, inputs: tf.Tensor) -> tf.Tensor:
"""call method on the layer
Args:
inputs (tf.Tensor): samplewise values for a given batch
Returns:
tf.Tensor (shape = ()): threshold value
"""
batch_thresh = tfp.stats.percentile(
inputs, q=self.percentile, axis=[0], interpolation="linear"
)
self.moving_thresh = tf_state_ops.assign(
self.moving_thresh,
self.alpha * self.moving_thresh + (1.0 - self.alpha) * batch_loss_thresh,
# use_locking=self._use_locking,
)
return self.moving_thresh
def get_config(self) -> dict:
"""Setting up the layer config
Returns:
dict: config key-value pairs
"""
base_config = super().get_config()
config = {
"alpha": self.alpha,
"moving_thresh_initializer": self.moving_thresh_initializer,
"percentile": self.percentile,
"threshhold": self.moving_thresh,
}
return dict(list(base_config.items()) + list(config.items()))
def compute_output_shape(self, input_shape: tuple) -> tuple:
"""shape of the layer output"""
return ()
以上自定义层可以按如下方式包含在工作流中:
thresholding_layer = CustomLayer()
# Dummy input
x = np.zeros((batch_size, 1))
current_threshold = thresholding_layer(x)
有关使用上述自定义层的更多详细信息以及 tf.compat.v1.scatter_update()
的用法,您可以查看以下内容 link。
https://medium.com/dive-into-ml-ai/custom-layer-with-memory-in-keras-1d0c03e722e9