TensorFlow 扩展 | Trainer 从 GenericExecutor 和 Keras 模型开始并不热
TensorFlow Extended | Trainer Not Warm Starting With GenericExecutor & Keras Model
我目前正在尝试让 TFX 管道的 Trainer 组件从同一管道的前一个 运行 热启动。用例是:
- 运行流水线一次,出模型
- 随着新数据的到来,使用新数据训练现有模型。
我知道 ResolverNode
组件就是为此目的而设计的,因此您可以在下面看到我如何使用它:
# detect the previously trained model
latest_model_resolver = ResolverNode(
instance_name='latest_model_resolver',
resolver_class=latest_artifacts_resolver.LatestArtifactsResolver,
latest_model=Channel(type=Model))
context.run(latest_model_resolver)
# set prior model as base_model
train_file = 'tfx_modules/recommender_train.py'
trainer = Trainer(
module_file=os.path.abspath(train_file),
custom_executor_spec=executor_spec.ExecutorClassSpec(GenericExecutor),
transformed_examples=transform.outputs['transformed_examples'],
transform_graph=transform.outputs['transform_graph'],
schema=schema_gen.outputs['schema'],
train_args=trainer_pb2.TrainArgs(num_steps=10000),
eval_args=trainer_pb2.EvalArgs(num_steps=5000),
base_model=latest_model_resolver.outputs['latest_model'])
上面的组件 运行 成功,并且 ResolverNode
能够从先前的管道 运行 中检测到最新模型。不会抛出任何错误 - 但是,当 运行ning context.run(trainer)
时,模型损失基本上从第一次开始的地方开始。在模型的第一个 运行 之后,它完成了约 0.1 的训练损失,但是,在第二个 运行(假定的热启动)时,它重新开始约 18.2.
这让我相信所有权重都已重新初始化,我认为这不应该发生。下面是相关的模型构造函数:
def build_keras_model():
"""build keras model"""
embedding_max_values = load(open(os.path.abspath('tfx-example/user_artifacts/embedding_max_dict.pkl'), 'rb'))
embedding_dimensions = dict([(key, 20) for key in embedding_max_values.keys()])
embedding_pairs = [recommender.EmbeddingPair(embedding_name=feature,
embedding_dimension=embedding_dimensions[feature],
embedding_max_val=embedding_max_values[feature])
for feature in recommender_constants.univalent_features]
numeric_inputs = []
for num_feature in recommender_constants.numeric_features:
numeric_inputs.append(keras.Input(shape=(1,), name=num_feature))
input_layers = numeric_inputs + [elem for pair in embedding_pairs for elem in pair.input_layers]
pre_concat_layers = numeric_inputs + [elem for pair in embedding_pairs for elem in pair.embedding_layers]
concat = keras.layers.Concatenate()(pre_concat_layers) if len(pre_concat_layers) > 1 else pre_concat_layers[0]
layer_1 = keras.layers.Dense(64, activation='relu', name='layer1')(concat)
output = keras.layers.Dense(1, kernel_initializer='lecun_uniform', name='out')(layer_1)
model = keras.models.Model(input_layers, outputs=output)
model.compile(optimizer='adam', loss='mean_squared_error')
return model
def run_fn(fn_args: TrainerFnArgs):
"""function for the Trainer component"""
tf_transform_output = tft.TFTransformOutput(fn_args.transform_output)
train_dataset = _input_fn(fn_args.train_files, fn_args.data_accessor,
tf_transform_output, 40)
eval_dataset = _input_fn(fn_args.eval_files, fn_args.data_accessor,
tf_transform_output, 40)
model = build_keras_model()
tensorboard_callback = tf.keras.callbacks.TensorBoard(
log_dir=fn_args.model_run_dir, update_freq='epoch', histogram_freq=1,
write_images=True)
model.fit(train_dataset, steps_per_epoch=fn_args.train_steps, validation_data=eval_dataset,
validation_steps=fn_args.eval_steps, callbacks=[tensorboard_callback],
epochs=5)
signatures = {
'serving_default':
_get_serve_tf_examples_fn(model, tf_transform_output).get_concrete_function(tf.TensorSpec(
shape=[None],
dtype=tf.string,
name='examples')
)
}
model.save(fn_args.serving_model_dir, save_format='tf', signatures=signatures)
为了研究这个问题,我仔细阅读了:
来自 TFX 的热启动示例
https://github.com/tensorflow/tfx/blob/master/tfx/examples/chicago_taxi_pipeline/taxi_pipeline_warmstart.py
但是,本指南使用 Estimator
组件而不是 Keras 组件。该组件有一个 warm_start_from
初始化参数,我找不到 Keras 等效项。
我怀疑:
或者热启动功能仅对Estimator
组件可用,即使为Keras组件设置base_model
也不会生效。
即使在成功加载之前的模型之后,我也会以某种方式告诉模型重新初始化权重 - 在这种情况下,我希望得到一个关于发生位置的指针。
任何帮助都会很棒!非常感谢。
对于 Keras 模型,您必须首先使用基本模型路径加载模型,然后您可以从那里继续训练,而不是构建新模型。
您的 Trainer 组件看起来正确,但在 run_fn
中改为执行以下操作:
def run_fn(fn_args: FnArgs):
model = tf.keras.models.load_model(fn_args.base_model)
model.fit(train_dataset, steps_per_epoch=fn_args.train_steps, validation_data=eval_dataset,
validation_steps=fn_args.eval_steps, callbacks=[tensorboard_callback],
epochs=5)
我目前正在尝试让 TFX 管道的 Trainer 组件从同一管道的前一个 运行 热启动。用例是:
- 运行流水线一次,出模型
- 随着新数据的到来,使用新数据训练现有模型。
我知道 ResolverNode
组件就是为此目的而设计的,因此您可以在下面看到我如何使用它:
# detect the previously trained model
latest_model_resolver = ResolverNode(
instance_name='latest_model_resolver',
resolver_class=latest_artifacts_resolver.LatestArtifactsResolver,
latest_model=Channel(type=Model))
context.run(latest_model_resolver)
# set prior model as base_model
train_file = 'tfx_modules/recommender_train.py'
trainer = Trainer(
module_file=os.path.abspath(train_file),
custom_executor_spec=executor_spec.ExecutorClassSpec(GenericExecutor),
transformed_examples=transform.outputs['transformed_examples'],
transform_graph=transform.outputs['transform_graph'],
schema=schema_gen.outputs['schema'],
train_args=trainer_pb2.TrainArgs(num_steps=10000),
eval_args=trainer_pb2.EvalArgs(num_steps=5000),
base_model=latest_model_resolver.outputs['latest_model'])
上面的组件 运行 成功,并且 ResolverNode
能够从先前的管道 运行 中检测到最新模型。不会抛出任何错误 - 但是,当 运行ning context.run(trainer)
时,模型损失基本上从第一次开始的地方开始。在模型的第一个 运行 之后,它完成了约 0.1 的训练损失,但是,在第二个 运行(假定的热启动)时,它重新开始约 18.2.
这让我相信所有权重都已重新初始化,我认为这不应该发生。下面是相关的模型构造函数:
def build_keras_model():
"""build keras model"""
embedding_max_values = load(open(os.path.abspath('tfx-example/user_artifacts/embedding_max_dict.pkl'), 'rb'))
embedding_dimensions = dict([(key, 20) for key in embedding_max_values.keys()])
embedding_pairs = [recommender.EmbeddingPair(embedding_name=feature,
embedding_dimension=embedding_dimensions[feature],
embedding_max_val=embedding_max_values[feature])
for feature in recommender_constants.univalent_features]
numeric_inputs = []
for num_feature in recommender_constants.numeric_features:
numeric_inputs.append(keras.Input(shape=(1,), name=num_feature))
input_layers = numeric_inputs + [elem for pair in embedding_pairs for elem in pair.input_layers]
pre_concat_layers = numeric_inputs + [elem for pair in embedding_pairs for elem in pair.embedding_layers]
concat = keras.layers.Concatenate()(pre_concat_layers) if len(pre_concat_layers) > 1 else pre_concat_layers[0]
layer_1 = keras.layers.Dense(64, activation='relu', name='layer1')(concat)
output = keras.layers.Dense(1, kernel_initializer='lecun_uniform', name='out')(layer_1)
model = keras.models.Model(input_layers, outputs=output)
model.compile(optimizer='adam', loss='mean_squared_error')
return model
def run_fn(fn_args: TrainerFnArgs):
"""function for the Trainer component"""
tf_transform_output = tft.TFTransformOutput(fn_args.transform_output)
train_dataset = _input_fn(fn_args.train_files, fn_args.data_accessor,
tf_transform_output, 40)
eval_dataset = _input_fn(fn_args.eval_files, fn_args.data_accessor,
tf_transform_output, 40)
model = build_keras_model()
tensorboard_callback = tf.keras.callbacks.TensorBoard(
log_dir=fn_args.model_run_dir, update_freq='epoch', histogram_freq=1,
write_images=True)
model.fit(train_dataset, steps_per_epoch=fn_args.train_steps, validation_data=eval_dataset,
validation_steps=fn_args.eval_steps, callbacks=[tensorboard_callback],
epochs=5)
signatures = {
'serving_default':
_get_serve_tf_examples_fn(model, tf_transform_output).get_concrete_function(tf.TensorSpec(
shape=[None],
dtype=tf.string,
name='examples')
)
}
model.save(fn_args.serving_model_dir, save_format='tf', signatures=signatures)
为了研究这个问题,我仔细阅读了:
来自 TFX 的热启动示例 https://github.com/tensorflow/tfx/blob/master/tfx/examples/chicago_taxi_pipeline/taxi_pipeline_warmstart.py
但是,本指南使用 Estimator
组件而不是 Keras 组件。该组件有一个 warm_start_from
初始化参数,我找不到 Keras 等效项。
我怀疑:
或者热启动功能仅对
Estimator
组件可用,即使为Keras组件设置base_model
也不会生效。即使在成功加载之前的模型之后,我也会以某种方式告诉模型重新初始化权重 - 在这种情况下,我希望得到一个关于发生位置的指针。
任何帮助都会很棒!非常感谢。
对于 Keras 模型,您必须首先使用基本模型路径加载模型,然后您可以从那里继续训练,而不是构建新模型。
您的 Trainer 组件看起来正确,但在 run_fn
中改为执行以下操作:
def run_fn(fn_args: FnArgs):
model = tf.keras.models.load_model(fn_args.base_model)
model.fit(train_dataset, steps_per_epoch=fn_args.train_steps, validation_data=eval_dataset,
validation_steps=fn_args.eval_steps, callbacks=[tensorboard_callback],
epochs=5)