如何在 sklearn 管道 mlflow 中记录 KerasClassifier 模型?
how to log KerasClassifier model in a sklearn pipeline mlflow?
我在 sklearn Pipeline
中有一组预处理阶段和一个 KerasClassifier
(from tensorflow.keras.wrappers.scikit_learn import KerasClassifier
).
的估计器
我的总体目标是在 mlflow
(在 databricks evn 中)中调整和记录整个 sklearn 管道。我收到一个令人困惑的类型错误,我不知道如何重新爱:
TypeError: can't pickle _thread.RLock objects
我有以下代码(没有调整阶段)returns 上面的错误:
conda_env = _mlflow_conda_env(
additional_conda_deps=None,
additional_pip_deps=[
"cloudpickle=={}".format(cloudpickle.__version__),
"scikit-learn=={}".format(sklearn.__version__),
"numpy=={}".format(np.__version__),
"tensorflow=={}".format(tf.__version__),
],
additional_conda_channels=None,
)
search_space = {
"estimator__dense_l1": 20,
"estimator__dense_l2": 20,
"estimator__learning_rate": 0.1,
"estimator__optimizer": "Adam",
}
def create_model(n):
model = Sequential()
model.add(Dense(int(n["estimator__dense_l1"]), activation="relu"))
model.add(Dense(int(n["estimator__dense_l2"]), activation="relu"))
model.add(Dense(1, activation="sigmoid"))
model.compile(
loss="binary_crossentropy",
optimizer=n["estimator__optimizer"],
metrics=["accuracy"],
)
return model
mlflow.sklearn.autolog()
with mlflow.start_run(nested=True) as run:
classfier = KerasClassifier(build_fn=create_model, n=search_space)
# fit the pipeline
clf = Pipeline(steps=[("preprocessor", preprocessor),
("estimator", classfier)])
h = clf.fit(
X_train,
y_train.values,
estimator__validation_split=0.2,
estimator__epochs=10,
estimator__verbose=2,
)
# log scores
acc_score = clf.score(X=X_test, y=y_test)
mlflow.log_metric("accuracy", acc_score)
signature = infer_signature(X_test, clf.predict(X_test))
# Log the model with a signature that defines the schema of the model's inputs and outputs.
mlflow.sklearn.log_model(
sk_model=clf, artifact_path="model",
signature=signature,
conda_env=conda_env
)
我在错误之前也收到了这个警告:
WARNING mlflow.sklearn.utils: Truncated the value of the key `steps`. Truncated value: `[('preprocessor', ColumnTransformer(n_jobs=None, remainder='drop', sparse_threshold=0.3,
transformer_weights=None,
transformers=[('num',
Pipeline(memory=None,
请注意整个管道在 mlflow 之外运行。
有人可以帮忙吗?
我想我现在找到了某种 workaround/solution,但我认为这个问题无论如何都需要在 MLFloow 中解决。
我做的可能不是最好的方法。
我使用了一个名为 scikeras 的 python 包来进行包装,然后可以记录模型
代码:
import scikeras
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Input, Dense, Dropout, LSTM, Flatten, Activation
from scikeras.wrappers import KerasClassifier
class ModelWrapper(mlflow.pyfunc.PythonModel):
def __init__(self, model):
self.model = model
def predict(self, context, model_input):
return self.model.predict(model_input)
conda_env = _mlflow_conda_env(
additional_conda_deps=None,
additional_pip_deps=[
"cloudpickle=={}".format(cloudpickle.__version__),
"scikit-learn=={}".format(sklearn.__version__),
"numpy=={}".format(np.__version__),
"tensorflow=={}".format(tf.__version__),
"scikeras=={}".format(scikeras.__version__),
],
additional_conda_channels=None,
)
param = {
"dense_l1": 20,
"dense_l2": 20,
"optimizer__learning_rate": 0.1,
"optimizer": "Adam",
"loss":"binary_crossentropy",
}
def create_model(dense_l1, dense_l2, meta):
n_features_in_ = meta["n_features_in_"]
X_shape_ = meta["X_shape_"]
n_classes_ = meta["n_classes_"]
model = Sequential()
model.add(Dense(n_features_in_, input_shape=X_shape_[1:], activation="relu"))
model.add(Dense(dense_l1, activation="relu"))
model.add(Dense(dense_l2, activation="relu"))
model.add(Dense(1, activation="sigmoid"))
return model
mlflow.sklearn.autolog()
with mlflow.start_run(run_name="sample_run"):
classfier = KerasClassifier(
create_model,
loss=param["loss"],
dense_l1=param["dense_l1"],
dense_l2=param["dense_l2"],
optimizer__learning_rate = param["optimizer__learning_rate"],
optimizer= param["optimizer"],
)
# fit the pipeline
clf = Pipeline(steps=[('preprocessor', preprocessor),
('estimator', classfier)])
h = clf.fit(X_train, y_train.values)
# log scores
acc_score = clf.score(X=X_test, y=y_test)
mlflow.log_metric("accuracy", acc_score)
signature = infer_signature(X_test, clf.predict(X_test))
model_nn = ModelWrapper(clf,)
mlflow.pyfunc.log_model(
python_model= model_nn,
artifact_path = "model",
signature = signature,
conda_env = conda_env
)
我在 sklearn Pipeline
中有一组预处理阶段和一个 KerasClassifier
(from tensorflow.keras.wrappers.scikit_learn import KerasClassifier
).
我的总体目标是在 mlflow
(在 databricks evn 中)中调整和记录整个 sklearn 管道。我收到一个令人困惑的类型错误,我不知道如何重新爱:
TypeError: can't pickle _thread.RLock objects
我有以下代码(没有调整阶段)returns 上面的错误:
conda_env = _mlflow_conda_env(
additional_conda_deps=None,
additional_pip_deps=[
"cloudpickle=={}".format(cloudpickle.__version__),
"scikit-learn=={}".format(sklearn.__version__),
"numpy=={}".format(np.__version__),
"tensorflow=={}".format(tf.__version__),
],
additional_conda_channels=None,
)
search_space = {
"estimator__dense_l1": 20,
"estimator__dense_l2": 20,
"estimator__learning_rate": 0.1,
"estimator__optimizer": "Adam",
}
def create_model(n):
model = Sequential()
model.add(Dense(int(n["estimator__dense_l1"]), activation="relu"))
model.add(Dense(int(n["estimator__dense_l2"]), activation="relu"))
model.add(Dense(1, activation="sigmoid"))
model.compile(
loss="binary_crossentropy",
optimizer=n["estimator__optimizer"],
metrics=["accuracy"],
)
return model
mlflow.sklearn.autolog()
with mlflow.start_run(nested=True) as run:
classfier = KerasClassifier(build_fn=create_model, n=search_space)
# fit the pipeline
clf = Pipeline(steps=[("preprocessor", preprocessor),
("estimator", classfier)])
h = clf.fit(
X_train,
y_train.values,
estimator__validation_split=0.2,
estimator__epochs=10,
estimator__verbose=2,
)
# log scores
acc_score = clf.score(X=X_test, y=y_test)
mlflow.log_metric("accuracy", acc_score)
signature = infer_signature(X_test, clf.predict(X_test))
# Log the model with a signature that defines the schema of the model's inputs and outputs.
mlflow.sklearn.log_model(
sk_model=clf, artifact_path="model",
signature=signature,
conda_env=conda_env
)
我在错误之前也收到了这个警告:
WARNING mlflow.sklearn.utils: Truncated the value of the key `steps`. Truncated value: `[('preprocessor', ColumnTransformer(n_jobs=None, remainder='drop', sparse_threshold=0.3,
transformer_weights=None,
transformers=[('num',
Pipeline(memory=None,
请注意整个管道在 mlflow 之外运行。 有人可以帮忙吗?
我想我现在找到了某种 workaround/solution,但我认为这个问题无论如何都需要在 MLFloow 中解决。
我做的可能不是最好的方法。 我使用了一个名为 scikeras 的 python 包来进行包装,然后可以记录模型
代码:
import scikeras
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Input, Dense, Dropout, LSTM, Flatten, Activation
from scikeras.wrappers import KerasClassifier
class ModelWrapper(mlflow.pyfunc.PythonModel):
def __init__(self, model):
self.model = model
def predict(self, context, model_input):
return self.model.predict(model_input)
conda_env = _mlflow_conda_env(
additional_conda_deps=None,
additional_pip_deps=[
"cloudpickle=={}".format(cloudpickle.__version__),
"scikit-learn=={}".format(sklearn.__version__),
"numpy=={}".format(np.__version__),
"tensorflow=={}".format(tf.__version__),
"scikeras=={}".format(scikeras.__version__),
],
additional_conda_channels=None,
)
param = {
"dense_l1": 20,
"dense_l2": 20,
"optimizer__learning_rate": 0.1,
"optimizer": "Adam",
"loss":"binary_crossentropy",
}
def create_model(dense_l1, dense_l2, meta):
n_features_in_ = meta["n_features_in_"]
X_shape_ = meta["X_shape_"]
n_classes_ = meta["n_classes_"]
model = Sequential()
model.add(Dense(n_features_in_, input_shape=X_shape_[1:], activation="relu"))
model.add(Dense(dense_l1, activation="relu"))
model.add(Dense(dense_l2, activation="relu"))
model.add(Dense(1, activation="sigmoid"))
return model
mlflow.sklearn.autolog()
with mlflow.start_run(run_name="sample_run"):
classfier = KerasClassifier(
create_model,
loss=param["loss"],
dense_l1=param["dense_l1"],
dense_l2=param["dense_l2"],
optimizer__learning_rate = param["optimizer__learning_rate"],
optimizer= param["optimizer"],
)
# fit the pipeline
clf = Pipeline(steps=[('preprocessor', preprocessor),
('estimator', classfier)])
h = clf.fit(X_train, y_train.values)
# log scores
acc_score = clf.score(X=X_test, y=y_test)
mlflow.log_metric("accuracy", acc_score)
signature = infer_signature(X_test, clf.predict(X_test))
model_nn = ModelWrapper(clf,)
mlflow.pyfunc.log_model(
python_model= model_nn,
artifact_path = "model",
signature = signature,
conda_env = conda_env
)