mlflow.exceptions.MlflowException: 不允许更改参数值。 key='input_rows' 的 Param 已经用 value='32205' 记录
mlflow.exceptions.MlflowException: Changing param values is not allowed. Param with key='input_rows' was already logged with value='32205'
我正在使用 Mlflow 作为工作编排工具。我有一个机器学习管道。在此管道中,我有 real-time 数据。我正在使用 Apache Kafka 收听这些数据。此外,我正在这样做:每当有 250 条消息涉及此主题时,我都会收集它们,并将此消息附加到我以前的数据中。之后,我的培训功能被触发。因此,我能够在每 250 个新数据中进行新的训练。使用 Mlflow,我可以显示训练模型的结果、指标和任何其他参数。但是在训练发生了一次之后,第二次没有发生,它抛出了我在标题中显示的这个错误。这是我的消费者:
topic_name = 'twitterdata'
train_every = 250
def consume_tweets():
consumer = KafkaConsumer(
topic_name,
bootstrap_servers=['localhost:9093'],
auto_offset_reset='latest',
enable_auto_commit=True,
auto_commit_interval_ms=5000,
fetch_max_bytes=128,
max_poll_records=100,
value_deserializer=lambda x: json.loads(x.decode('utf-8')))
tweet_counter = 0
for message in consumer:
tweets = json.loads(json.dumps(message.value))
# print(tweets['text'])
tweet_sentiment = make_prediction(tweets['text'])
if tweet_counter == train_every:
update_df()
data_path = 'data/updated_tweets.csv'
train(data_path)
print("\nTraining with new data is completed!\n")
tweet_counter = 0
else:
tweet_counter += 1
publish_prediction(tweet_sentiment, tweets['text'])
这是我的 train.py:
train_tweets = pd.read_csv(DATA_PATH)
# train_tweets = train_tweets[:20000]
tweets = train_tweets.tweet.values
labels = train_tweets.label.values
# Log data params
mlflow.log_param('input_rows', train_tweets.shape[0])
# Do preprocessing and return vectorizer with it
vectorizer, processed_features = embedding(tweets)
# Saving vectorizer
save_vectorizer(vectorizer)
# Split data
X_train, X_test, y_train, y_test = train_test_split(processed_features, labels, test_size=0.2, random_state=0)
# Handle imbalanced data by using 'Smote' and log to Mlflow
smote = SMOTE('minority')
mlflow.log_param("over-sampling", smote)
X_train, y_train = smote.fit_sample(X_train, y_train)
# text_classifier = MultinomialNB()
text_classifier = LogisticRegression(max_iter=10000)
text_classifier.fit(X_train, y_train)
predictions = text_classifier.predict(X_test)
# Model metrics
(rmse, mae, r2) = eval_metrics(y_test, predictions)
mlflow.log_param('os-row-Xtrain', X_train.shape[0])
mlflow.log_param('os-row-ytrain', y_train.shape[0])
mlflow.log_param("model_name", text_classifier)
mlflow.log_metric("rmse", rmse)
mlflow.log_metric("r2", r2)
mlflow.log_metric("mae", mae)
mlflow.log_metric('acc_score', accuracy_score(y_test, predictions))
mlflow.sklearn.log_model(text_classifier, "model")
我无法解决问题。 MLflow 是最新的工具之一,所以 MLflow 的问题和例子很少。
我认为您需要为每批新数据使用 MLflow“运行”,以便为每次新训练独立记录您的参数。
因此,请在您的消费者中尝试以下操作:
if tweet_counter == train_every:
update_df()
data_path = 'data/updated_tweets.csv'
with mlflow.start_run() as mlrun:
train(data_path)
print("\nTraining with new data is completed!\n")
tweet_counter = 0
我已经添加了
mlflow.set_experiment(experiment_name="name_experiment")
with mlflow.start_run() as mlrun:
mlflow.log_param("epoch",args.epochs)
for epoch in range(start_epoch, args.epochs):
#code
mlflow.end_run()
这解决了我的问题
我正在使用 Mlflow 作为工作编排工具。我有一个机器学习管道。在此管道中,我有 real-time 数据。我正在使用 Apache Kafka 收听这些数据。此外,我正在这样做:每当有 250 条消息涉及此主题时,我都会收集它们,并将此消息附加到我以前的数据中。之后,我的培训功能被触发。因此,我能够在每 250 个新数据中进行新的训练。使用 Mlflow,我可以显示训练模型的结果、指标和任何其他参数。但是在训练发生了一次之后,第二次没有发生,它抛出了我在标题中显示的这个错误。这是我的消费者:
topic_name = 'twitterdata'
train_every = 250
def consume_tweets():
consumer = KafkaConsumer(
topic_name,
bootstrap_servers=['localhost:9093'],
auto_offset_reset='latest',
enable_auto_commit=True,
auto_commit_interval_ms=5000,
fetch_max_bytes=128,
max_poll_records=100,
value_deserializer=lambda x: json.loads(x.decode('utf-8')))
tweet_counter = 0
for message in consumer:
tweets = json.loads(json.dumps(message.value))
# print(tweets['text'])
tweet_sentiment = make_prediction(tweets['text'])
if tweet_counter == train_every:
update_df()
data_path = 'data/updated_tweets.csv'
train(data_path)
print("\nTraining with new data is completed!\n")
tweet_counter = 0
else:
tweet_counter += 1
publish_prediction(tweet_sentiment, tweets['text'])
这是我的 train.py:
train_tweets = pd.read_csv(DATA_PATH)
# train_tweets = train_tweets[:20000]
tweets = train_tweets.tweet.values
labels = train_tweets.label.values
# Log data params
mlflow.log_param('input_rows', train_tweets.shape[0])
# Do preprocessing and return vectorizer with it
vectorizer, processed_features = embedding(tweets)
# Saving vectorizer
save_vectorizer(vectorizer)
# Split data
X_train, X_test, y_train, y_test = train_test_split(processed_features, labels, test_size=0.2, random_state=0)
# Handle imbalanced data by using 'Smote' and log to Mlflow
smote = SMOTE('minority')
mlflow.log_param("over-sampling", smote)
X_train, y_train = smote.fit_sample(X_train, y_train)
# text_classifier = MultinomialNB()
text_classifier = LogisticRegression(max_iter=10000)
text_classifier.fit(X_train, y_train)
predictions = text_classifier.predict(X_test)
# Model metrics
(rmse, mae, r2) = eval_metrics(y_test, predictions)
mlflow.log_param('os-row-Xtrain', X_train.shape[0])
mlflow.log_param('os-row-ytrain', y_train.shape[0])
mlflow.log_param("model_name", text_classifier)
mlflow.log_metric("rmse", rmse)
mlflow.log_metric("r2", r2)
mlflow.log_metric("mae", mae)
mlflow.log_metric('acc_score', accuracy_score(y_test, predictions))
mlflow.sklearn.log_model(text_classifier, "model")
我无法解决问题。 MLflow 是最新的工具之一,所以 MLflow 的问题和例子很少。
我认为您需要为每批新数据使用 MLflow“运行”,以便为每次新训练独立记录您的参数。
因此,请在您的消费者中尝试以下操作:
if tweet_counter == train_every:
update_df()
data_path = 'data/updated_tweets.csv'
with mlflow.start_run() as mlrun:
train(data_path)
print("\nTraining with new data is completed!\n")
tweet_counter = 0
我已经添加了
mlflow.set_experiment(experiment_name="name_experiment")
with mlflow.start_run() as mlrun:
mlflow.log_param("epoch",args.epochs)
for epoch in range(start_epoch, args.epochs):
#code
mlflow.end_run()
这解决了我的问题