如何在 sklearn 管道 mlflow 中记录 KerasClassifier 模型?

how to log KerasClassifier model in a sklearn pipeline mlflow?

我在 sklearn Pipeline 中有一组预处理阶段和一个 KerasClassifier (from tensorflow.keras.wrappers.scikit_learn import KerasClassifier).

的估计器

我的总体目标是在 mlflow(在 databricks evn 中)中调整和记录整个 sklearn 管道。我收到一个令人困惑的类型错误,我不知道如何重新爱:

TypeError: can't pickle _thread.RLock objects

我有以下代码(没有调整阶段)returns 上面的错误:

conda_env = _mlflow_conda_env(
    additional_conda_deps=None,
    additional_pip_deps=[
        "cloudpickle=={}".format(cloudpickle.__version__),
        "scikit-learn=={}".format(sklearn.__version__),
        "numpy=={}".format(np.__version__),
        "tensorflow=={}".format(tf.__version__),
    ],
    additional_conda_channels=None,
)

search_space = {
    "estimator__dense_l1": 20,
    "estimator__dense_l2": 20,
    "estimator__learning_rate": 0.1,
    "estimator__optimizer": "Adam",
}


def create_model(n):

    model = Sequential()
    model.add(Dense(int(n["estimator__dense_l1"]), activation="relu"))
    model.add(Dense(int(n["estimator__dense_l2"]), activation="relu"))
    model.add(Dense(1, activation="sigmoid"))
    model.compile(
        loss="binary_crossentropy",
        optimizer=n["estimator__optimizer"],
        metrics=["accuracy"],
    )

    return model


mlflow.sklearn.autolog()
with mlflow.start_run(nested=True) as run:

    classfier = KerasClassifier(build_fn=create_model, n=search_space)
    # fit the pipeline
    clf = Pipeline(steps=[("preprocessor", preprocessor), 
                          ("estimator", classfier)])
    h = clf.fit(
        X_train,
        y_train.values,
        estimator__validation_split=0.2,
        estimator__epochs=10,
        estimator__verbose=2,
    )

    # log scores
    acc_score = clf.score(X=X_test, y=y_test)
    mlflow.log_metric("accuracy", acc_score)

    signature = infer_signature(X_test, clf.predict(X_test))
    # Log the model with a signature that defines the schema of the model's inputs and outputs.
    mlflow.sklearn.log_model(
        sk_model=clf, artifact_path="model", 
        signature=signature, 
        conda_env=conda_env
    )

我在错误之前也收到了这个警告:


    WARNING mlflow.sklearn.utils: Truncated the value of the key `steps`. Truncated value: `[('preprocessor', ColumnTransformer(n_jobs=None, remainder='drop', sparse_threshold=0.3,
                      transformer_weights=None,
                      transformers=[('num',
                                   Pipeline(memory=None,

请注意整个管道在 mlflow 之外运行。 有人可以帮忙吗?

我想我现在找到了某种 workaround/solution,但我认为这个问题无论如何都需要在 MLFloow 中解决。

我做的可能不是最好的方法。 我使用了一个名为 scikeras 的 python 包来进行包装,然后可以记录模型

代码:

import scikeras 
import tensorflow as tf 
from tensorflow.keras.models import Sequential 
from tensorflow.keras.layers import Input, Dense, Dropout, LSTM, Flatten, Activation 
 
from scikeras.wrappers import KerasClassifier 
  
 
class ModelWrapper(mlflow.pyfunc.PythonModel): 
    def __init__(self, model): 
        self.model = model 
 
    def predict(self, context, model_input): 
        return self.model.predict(model_input) 
 
conda_env =  _mlflow_conda_env( 
      additional_conda_deps=None, 
      additional_pip_deps=[ 
        "cloudpickle=={}".format(cloudpickle.__version__),  
        "scikit-learn=={}".format(sklearn.__version__), 
        "numpy=={}".format(np.__version__), 
        "tensorflow=={}".format(tf.__version__), 
        "scikeras=={}".format(scikeras.__version__), 
      ], 
      additional_conda_channels=None, 
  ) 
 
param = { 
   "dense_l1": 20, 
   "dense_l2": 20, 
   "optimizer__learning_rate": 0.1, 
   "optimizer": "Adam", 
   "loss":"binary_crossentropy", 
} 
 
  
def create_model(dense_l1, dense_l2, meta): 
  
  n_features_in_ = meta["n_features_in_"] 
  X_shape_ = meta["X_shape_"] 
  n_classes_ = meta["n_classes_"] 
 
  model = Sequential() 
  model.add(Dense(n_features_in_, input_shape=X_shape_[1:], activation="relu")) 
  model.add(Dense(dense_l1, activation="relu")) 
  model.add(Dense(dense_l2, activation="relu")) 
  model.add(Dense(1, activation="sigmoid")) 
 
  return model   
 
mlflow.sklearn.autolog() 
with mlflow.start_run(run_name="sample_run"): 
 
  classfier = KerasClassifier( 
    create_model, 
    loss=param["loss"], 
    dense_l1=param["dense_l1"], 
    dense_l2=param["dense_l2"], 
    optimizer__learning_rate = param["optimizer__learning_rate"], 
    optimizer= param["optimizer"], 
) 
 
  # fit the pipeline 
  clf = Pipeline(steps=[('preprocessor', preprocessor), 
                      ('estimator', classfier)])   
 
  h = clf.fit(X_train, y_train.values) 
  # log scores 
  acc_score = clf.score(X=X_test, y=y_test) 
  mlflow.log_metric("accuracy", acc_score) 
  signature = infer_signature(X_test, clf.predict(X_test)) 
  model_nn = ModelWrapper(clf,)  
 
  mlflow.pyfunc.log_model( 
      python_model= model_nn, 
      artifact_path = "model",  
      signature = signature,  
      conda_env = conda_env 
  )