mlflow.exceptions.MlflowException: 不允许更改参数值。 key='input_rows' 的 Param 已经用 value='32205' 记录

mlflow.exceptions.MlflowException: Changing param values is not allowed. Param with key='input_rows' was already logged with value='32205'

我正在使用 Mlflow 作为工作编排工具。我有一个机器学习管道。在此管道中,我有 real-time 数据。我正在使用 Apache Kafka 收听这些数据。此外,我正在这样做:每当有 250 条消息涉及此主题时,我都会收集它们,并将此消息附加到我以前的数据中。之后,我的培训功能被触发。因此,我能够在每 250 个新数据中进行新的训练。使用 Mlflow,我可以显示训练模型的结果、指标和任何其他参数。但是在训练发生了一次之后,第二次没有发生,它抛出了我在标题中显示的这个错误。这是我的消费者:

topic_name = 'twitterdata'
train_every = 250


def consume_tweets():
    consumer = KafkaConsumer(
        topic_name,
        bootstrap_servers=['localhost:9093'],
        auto_offset_reset='latest',
        enable_auto_commit=True,
        auto_commit_interval_ms=5000,
        fetch_max_bytes=128,
        max_poll_records=100,
        value_deserializer=lambda x: json.loads(x.decode('utf-8')))

    tweet_counter = 0
    for message in consumer:
        tweets = json.loads(json.dumps(message.value))
        # print(tweets['text'])
        tweet_sentiment = make_prediction(tweets['text'])

        if tweet_counter == train_every:
            update_df()
            data_path = 'data/updated_tweets.csv'
            train(data_path)
            print("\nTraining with new data is completed!\n")
            tweet_counter = 0

        else:
            tweet_counter += 1

        publish_prediction(tweet_sentiment, tweets['text'])

这是我的 train.py:

train_tweets = pd.read_csv(DATA_PATH)
    # train_tweets = train_tweets[:20000]

    tweets = train_tweets.tweet.values
    labels = train_tweets.label.values

    # Log data params
    mlflow.log_param('input_rows', train_tweets.shape[0])

    # Do preprocessing and return vectorizer with it
    vectorizer, processed_features = embedding(tweets)

    # Saving vectorizer
    save_vectorizer(vectorizer)

    # Split data
    X_train, X_test, y_train, y_test = train_test_split(processed_features, labels, test_size=0.2, random_state=0)

    # Handle imbalanced data by using 'Smote' and log to Mlflow
    smote = SMOTE('minority')
    mlflow.log_param("over-sampling", smote)

    X_train, y_train = smote.fit_sample(X_train, y_train)

    # text_classifier = MultinomialNB()
    text_classifier = LogisticRegression(max_iter=10000)
    text_classifier.fit(X_train, y_train)
    predictions = text_classifier.predict(X_test)

    # Model metrics
    (rmse, mae, r2) = eval_metrics(y_test, predictions)

    mlflow.log_param('os-row-Xtrain', X_train.shape[0])
    mlflow.log_param('os-row-ytrain', y_train.shape[0])
    mlflow.log_param("model_name", text_classifier)
    mlflow.log_metric("rmse", rmse)
    mlflow.log_metric("r2", r2)
    mlflow.log_metric("mae", mae)
    mlflow.log_metric('acc_score', accuracy_score(y_test, predictions))

    mlflow.sklearn.log_model(text_classifier, "model")

我无法解决问题。 MLflow 是最新的工具之一,所以 MLflow 的问题和例子很少。

我认为您需要为每批新数据使用 MLflow“运行”,以便为每次新训练独立记录您的参数。

因此,请在您的消费者中尝试以下操作:

if tweet_counter == train_every:
            update_df()
            data_path = 'data/updated_tweets.csv'
            with mlflow.start_run() as mlrun:
               train(data_path)
            print("\nTraining with new data is completed!\n")
            tweet_counter = 0

我已经添加了

mlflow.set_experiment(experiment_name="name_experiment")
with mlflow.start_run() as mlrun:
    mlflow.log_param("epoch",args.epochs)

    for epoch in range(start_epoch, args.epochs):

        #code

mlflow.end_run()

这解决了我的问题