在每个特定时间间隔后训练 ML 模型时出现内存使用问题

Memory usage issue when training the ML model after every specific interval

我正在尝试 运行 每 1 小时间隔一次 ML 训练脚本,但每小时后内存使用量增加大约 20%,3-4 小时后,内存使用量达到 90%,然后此脚本抛出 内存错误 。 我想知道为什么当火车功能完成时内存没有释放。

尽管如果我 运行 手动训练函数(通过不使用任何类型的线程调度程序并连续调用两次或三次训练函数),则不会显示此行为。

关于在每个特定时间间隔后训练模型的任何建议。

这是代码。

import pickle
import pandas as pd
from pymongo import MongoClient
import datetime
from apscheduler.schedulers.blocking import BlockingScheduler

def train():

    client = MongoClient(databaseURI)
    db = client['mydb']

    movie_data = []
    for index, obj in enumerate(db.movies.find({})):
        movie_obj = {}

        movie_obj['_id'] = obj['_id']
        movie_obj['title'] = obj['title']
        movie_obj['rating'] = obj['rating']
        movie_data.append(movie_obj)


    user_data = []
    for index, obj in enumerate(db.users.find({})):
        user_obj = {}

        user_obj['_id'] = obj['_id']
        user_obj['username'] = obj['username']
        user_obj['movie_id'] = obj['movie_id']
        user_obj['rating'] = obj['rating']
        user_data.append(user_obj)


    movie_data_df = pd.DataFrame(movie_data)
    user_data_df = pd.DataFrame(user_data)

    # some ML training ALGO
    trainedModel = algo.train(user_data_df, movie_data_df)

    trained.to_pickle('files/trained.pkl')


scheduler = BlockingScheduler()
scheduler.add_job(train, 'interval', hours=1, next_run_time=datetime.datetime.now())
scheduler.start()

Job stores house the scheduled jobs. The default job store simply keeps the jobs in memory, but others store them in various kinds of databases. A job’s data is serialized when it is saved to a persistent job store, and deserialized when it’s loaded back from it. Job stores (other than the default one) don’t keep the job data in memory, but act as middlemen for saving, loading, updating and searching jobs in the backend.

我建议尝试以下解决方案之一:

  1. jobstore 从默认(即内存)更改为某个持久位置 (Example)。

  2. 或尝试将参数replace_existing设置为True默认为False)。

    scheduler.add_job(train, 'interval', hours=1, 
                      next_run_time=datetime.datetime.now(), replace_existing=True)
    

旁注:

我认为可能还有另一种丑陋的方法来修复它(我没试过!),这样你就可以添加一个Listener来收听崩溃并重启整个过程! (如果你可以尝试并以更pythonic的方式修改它!)

scheduler = BlockingScheduler()
scheduler.add_job(train, 'interval', hours=1, next_run_time=datetime.datetime.now())

def my_listener(event):
    if event.exception:       
        global scheduler
        scheduler.shutdown()
        gc.collect()
        scheduler = BlockingScheduler()
        scheduler.add_job(train, 'interval', hours=1, next_run_time=datetime.datetime.now())
        scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
        scheduler.start()

scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
scheduler.start()