如何为 xgboost 实施增量训练?

How can I implement incremental training for xgboost?

问题是由于训练数据大小,我的训练数据无法放入 RAM。所以我需要一种方法,首先在整个训练数据集上构建一棵树,计算残差构建另一棵树等等(就像梯度提升树一样)。显然,如果我在某个循环中调用 model = xgb.train(param, batch_dtrain, 2) - 它不会有帮助,因为在这种情况下它只会为每个批次重建整个模型。

在第一批训练后尝试保存模型。然后,在连续运行时,为 xgb.train 方法提供已保存模型的文件路径。

这是一个小实验,我 运行 说服自己它有效:

首先,将波士顿数据集拆分为训练集和测试集。 然后将训练集分成两半。 将模型与上半场拟合并获得将作为基准的分数。 然后用下半部分拟合两个模型;一个模型将具有附加参数 xgb_model。如果传入额外参数没有影响,那么我们希望它们的分数相似。 但是,幸运的是,新模型的性能似乎比第一个好得多。

import xgboost as xgb
from sklearn.cross_validation import train_test_split as ttsplit
from sklearn.datasets import load_boston
from sklearn.metrics import mean_squared_error as mse

X = load_boston()['data']
y = load_boston()['target']

# split data into training and testing sets
# then split training set in half
X_train, X_test, y_train, y_test = ttsplit(X, y, test_size=0.1, random_state=0)
X_train_1, X_train_2, y_train_1, y_train_2 = ttsplit(X_train, 
                                                     y_train, 
                                                     test_size=0.5,
                                                     random_state=0)

xg_train_1 = xgb.DMatrix(X_train_1, label=y_train_1)
xg_train_2 = xgb.DMatrix(X_train_2, label=y_train_2)
xg_test = xgb.DMatrix(X_test, label=y_test)

params = {'objective': 'reg:linear', 'verbose': False}
model_1 = xgb.train(params, xg_train_1, 30)
model_1.save_model('model_1.model')

# ================= train two versions of the model =====================#
model_2_v1 = xgb.train(params, xg_train_2, 30)
model_2_v2 = xgb.train(params, xg_train_2, 30, xgb_model='model_1.model')

print(mse(model_1.predict(xg_test), y_test))     # benchmark
print(mse(model_2_v1.predict(xg_test), y_test))  # "before"
print(mse(model_2_v2.predict(xg_test), y_test))  # "after"

# 23.0475232194
# 39.6776876084
# 27.2053239482

参考:https://github.com/dmlc/xgboost/blob/master/python-package/xgboost/training.py

现在(版本 0.6?)有一个可能有用的 process_update 参数。这是一个实验:

import pandas as pd
import xgboost as xgb
from sklearn.model_selection import ShuffleSplit
from sklearn.datasets import load_boston
from sklearn.metrics import mean_squared_error as mse

boston = load_boston()
features = boston.feature_names
X = boston.data
y = boston.target

X=pd.DataFrame(X,columns=features)
y = pd.Series(y,index=X.index)

# split data into training and testing sets
rs = ShuffleSplit(test_size=0.3, n_splits=1, random_state=0)
for train_idx,test_idx in rs.split(X):  # this looks silly
    pass

train_split = round(len(train_idx) / 2)
train1_idx = train_idx[:train_split]
train2_idx = train_idx[train_split:]
X_train = X.loc[train_idx]
X_train_1 = X.loc[train1_idx]
X_train_2 = X.loc[train2_idx]
X_test = X.loc[test_idx]
y_train = y.loc[train_idx]
y_train_1 = y.loc[train1_idx]
y_train_2 = y.loc[train2_idx]
y_test = y.loc[test_idx]

xg_train_0 = xgb.DMatrix(X_train, label=y_train)
xg_train_1 = xgb.DMatrix(X_train_1, label=y_train_1)
xg_train_2 = xgb.DMatrix(X_train_2, label=y_train_2)
xg_test = xgb.DMatrix(X_test, label=y_test)

params = {'objective': 'reg:linear', 'verbose': False}
model_0 = xgb.train(params, xg_train_0, 30)
model_1 = xgb.train(params, xg_train_1, 30)
model_1.save_model('model_1.model')
model_2_v1 = xgb.train(params, xg_train_2, 30)
model_2_v2 = xgb.train(params, xg_train_2, 30, xgb_model=model_1)

params.update({'process_type': 'update',
               'updater'     : 'refresh',
               'refresh_leaf': True})
model_2_v2_update = xgb.train(params, xg_train_2, 30, xgb_model=model_1)

print('full train\t',mse(model_0.predict(xg_test), y_test)) # benchmark
print('model 1 \t',mse(model_1.predict(xg_test), y_test))  
print('model 2 \t',mse(model_2_v1.predict(xg_test), y_test))  # "before"
print('model 1+2\t',mse(model_2_v2.predict(xg_test), y_test))  # "after"
print('model 1+update2\t',mse(model_2_v2_update.predict(xg_test), y_test))  # "after"

输出:

full train   17.8364309709
model 1      24.2542132108
model 2      25.6967017352
model 1+2    22.8846455135
model 1+update2  14.2816257268

我创建 a gist of jupyter notebook 是为了证明可以增量训练 xgboost 模型。我使用波士顿数据集来训练模型。我做了 3 个实验——一次性学习、迭代一次性学习、迭代增量学习。在增量训练中,我将波士顿数据以 50 个大小的批次传递给模型。

要点的要点是您必须多次迭代数据以使模型收敛到一次性(所有数据)学习所达到的精度。

下面是使用xgboost进行迭代增量学习的对应代码。

batch_size = 50
iterations = 25
model = None
for i in range(iterations):
    for start in range(0, len(x_tr), batch_size):
        model = xgb.train({
            'learning_rate': 0.007,
            'update':'refresh',
            'process_type': 'update',
            'refresh_leaf': True,
            #'reg_lambda': 3,  # L2
            'reg_alpha': 3,  # L1
            'silent': False,
        }, dtrain=xgb.DMatrix(x_tr[start:start+batch_size], y_tr[start:start+batch_size]), xgb_model=model)

        y_pr = model.predict(xgb.DMatrix(x_te))
        #print('    MSE itr@{}: {}'.format(int(start/batch_size), sklearn.metrics.mean_squared_error(y_te, y_pr)))
    print('MSE itr@{}: {}'.format(i, sklearn.metrics.mean_squared_error(y_te, y_pr)))

y_pr = model.predict(xgb.DMatrix(x_te))
print('MSE at the end: {}'.format(sklearn.metrics.mean_squared_error(y_te, y_pr)))

XGBoost 版本:0.6

如果您的问题与数据集大小有关并且您并不真正需要增量学习(例如,您不是在处理流媒体应用程序),那么您应该查看Spark 或 Flink。

这两个框架可以在非常大的数据集上进行训练,RAM 很小,可以利用磁盘内存。这两个框架都在内部处理内存问题。虽然 Flink 首先解决了它,但 Spark 在最近的版本中赶上了。

看看:

看起来除了再次调用 xgb.train(....) 之外您不需要任何其他东西,但请提供上一批次的模型结果:

# python
params = {} # your params here
ith_batch = 0
n_batches = 100
model = None
while ith_batch < n_batches:
    d_train = getBatchData(ith_batch)
    model = xgb.train(params, d_train, xgb_model=model)
    ith_batch += 1

这是基于https://xgboost.readthedocs.io/en/latest/python/python_api.html

对于 paulperry 的代码,如果将 "train_split = round(len(train_idx) / 2)" 中的一行更改为 "train_split = len(train_idx) - 50"。模型 1+update2 将从 14.2816257268 更改为 45.60806270012028。很多 "leaf=0" 导致转储文件。

当更新样本集相对较小时,更新模型效果不佳。 对于binary:logistic,当更新样本集只有一个class时,更新模型无法使用。

我没有测试过的一个可能的解决方案是使用一个 dask 数据帧,它应该与 pandas 数据帧一样,但(我假设)利用磁盘并读入和读出 RAM。这里有一些有用的链接。 this link 提到如何与 xgboost 一起使用另见 also see。 此外,这里还有 an experimental options from XGBoost,但它“尚未准备好生产”

Hey guys you can use my simple code for incremental model training with xgb base class :

    batch_size = 10000000


    X_train="your pandas training DataFrame" 
    y_train="Your lables"
    
    #Store eval results
    evals_result={}
    Deval = xgb.DMatrix(X_valid, y_valid)
    eval_sets = [(Dtrain, 'train'), (Deval, 'eval')]
    for start in range(0, n, batch_size):
           model = xgb.train({'refresh_leaf': True, 
                         'process_type': 'default', 
                         'max_depth': 5, 
                         'objective': 'reg:squarederror', 
                         'num_parallel_tree': 2,
                        'learning_rate':0.05,
                        'n_jobs':-1},
                        dtrain=xgb.DMatrix(X_train, y_train), evals=eval_sets, early_stopping_rounds=5,num_boost_round=100,evals_result=evals_result,xgb_model=model) 

不是基于xgboost,而是有C++增量决策树
参见 gaenari

可以插入和更新连续分块数据,如果概念漂移会降低准确性,则可以运行重建。

我同意@desertnaut 的解决方案。

我有一个数据集,我将其分成 4 个批次。我必须首先在没有 xgb_model 参数的情况下进行初始拟合,然后下一次拟合将具有 xgb_model 参数,如下所示(我使用的是 Sklearn API):

for i, (X_batch, y_batch) in enumerate(zip(self.X_train_batched, self.y_train_batched)):
    print(f'Step: {i}',end = ' ')
    if i == 0:
        model_xgbc.fit(X_batch, y_batch, eval_set=[(self.X_valid, self.y_valid)],
                        verbose=False, eval_metric = ['logloss'],
                        early_stopping_rounds = 400)
    else:
        model_xgbc.fit(X_batch, y_batch, eval_set=[(self.X_valid, self.y_valid)],
                        verbose=False, eval_metric = ['logloss'],
                        early_stopping_rounds = 400, xgb_model=model_xgbc)
            
    preds = model_xgbc.predict(self.X_valid)
    
    rmse = metrics.mean_squared_error(self.y_valid, preds,squared=False)