如何在 scikit-learn 中创建自定义评分函数,以便根据实例的各个属性对一组实例进行评分?

How to create a customized scoring function in scikit-learn for scoring a set of instances based on their individual properties?

我正在尝试执行 GridSearchCV 来优化我的 classifier 的超参数,这应该通过优化自定义评分函数来完成。问题是,评分函数是根据特定成本分配的,每个实例都不同(成本也是每个实例的一个特征)。如下例所示,需要另一个数组 test_amt 来保存每个实例的成本(除了 'normal' 仅获得 yy_pred 的评分函数。

    def calculate_costs(y_test, y_test_pred, test_amt):
        cost = 0

        for i in range(1, len(y_test)):
            y = y_test.iloc[i]
            y_pred = y_test_pred.iloc[i]
            x_amt = test_amt.iloc[i]

            if y == 0 and y_pred == 0:
                cost -= x_amt * 1.1
            elif y == 0 and y_pred == 1:
                cost += x_amt
            elif y == 1 and y_pred == 0:
                cost += x_amt * 1.1
            elif y == 1 and y_pred == 1:
                cost += 0
            else:
                print("ERROR! No cost could be assigned to the instance: " + str(i))
        return cost

当我在用三个数组训练后调用此函数时,它完美地计算了模型产生的总成本。然而,将其集成到 GridSearchCV 中很困难,因为评分函数只需要两个参数。虽然有可能将额外的 kwargs 传递给 scorer,但我不知道如何传递依赖于 GridSearchCV 拆分的子集 目前正在处理中。

目前我想到/尝试过的:

  1. 使用全局存储的 pandas.Series 对象将整个管道包装在 class 中,该对象使用索引存储每个实例的成本。然后,理论上可以通过使用相同的索引调用实例来引用实例的成本。不幸的是,这不起作用,因为 scikit-learn 将所有内容都转换为 numpy 数组。

    def calculate_costs_class(y_test, y_test_pred):
        cost = 0
        for index, _ in y_test.iteritems():
            y = y_test.loc[index]
            y_pred = y_test_pred.loc[index]
            x_amt = self.test_amt.loc[index]
    
            if y == 0 and y_pred == 0:
                cost += (x_amt * (-1)) + 5 + (x_amt * 0.1)  # -revenue, +shipping, +fees
            elif y == 0 and y_pred == 1:
                cost += x_amt  # +revenue
            elif y == 1 and y_pred == 0:
                cost += x_amt + 5 + (x_amt * 0.1) + 5  # +revenue, +shipping, +fees, +charge cost
            elif y == 1 and y_pred == 1:
                cost += 0  # nothing
            else:
                print("ERROR! No cost could be assigned to the instance: " + str(index))
        return cost
    
  2. Creating a custom PseudoInt class,也就是标签的数据类型,它继承了int的所有属性 但也能够存储实例的成本(同时保留其所有属性以应用逻辑操作)。虽然这甚至可以在 Scikit Learn 之外工作,但 scikit learn 中的 check_classification_targets 方法会引发 ValueError: Unknown label type: 'unknown'错误。

    class PseudoInt(int):
        def __new__(cls, x, cost, *args, **kwargs):
            instance = int.__new__(cls, x, *args, **kwargs)
            instance.cost = cost
            return instance
    
  3. 没试过但是想到:由于cost也是实例集X中的一个特征,在__call__的方法中也有_PredictScorer(_BaseScorer) class 在 Scikit 的 scorer.py 中。如果我重新编程调用函数以将成本数组作为 X 的子集传递给 score_func 我也会有成本。

  4. 或者:我可以自己实现一切。

有"easier"解决方案吗?

我找到了一种解决问题的方法,方法是采用第二个建议答案的路径:将 PseudoInteger 传递给 Scikit-Learn,与普通整数进行比较或进行数学运算时,它具有与普通整数相同的所有属性。但是,它也充当了 int 的包装器,并且还可以存储实例变量(例如实例的成本)。正如问题中已经指出的那样,这会导致 Scikit-learn 认识到传递的标签数组中的值实际上是 object 类型,而不是 int.所以我只是将第 273 行中 Scikit-Learn 的 multiclass.py 方法的 type_of_target(y) 中的测试替换为return 'binary' 虽然没有通过测试。因此,Scikit-Learn 只是将整个问题(本应如此)视为二元分类问题。因此 type_of_target(y) 方法中的第 269-273 行 multiclass.py 现在看起来像:

# Invalid inputs
if y.ndim > 2 or (y.dtype == object and len(y) and
                  not isinstance(y.flat[0], string_types)):
    # return 'unknown'  # [[[1, 2]]] or [obj_1] and not ["label_1"]
    return 'binary' # Sneaky, modified to force binary classification.

我的代码如下所示:

import sklearn
import sklearn.model_selection
import sklearn.base
import sklearn.metrics
import numpy as np
import sklearn.tree
import sklearn.feature_selection
from sklearn.model_selection import GridSearchCV
from sklearn.pipeline import Pipeline
from sklearn.metrics.scorer import make_scorer


class PseudoInt(int):
    # Behaves like an integer, but is able to store instance variables
    pass


def grid_search(x, y_normal, x_amounts):
    # Change the label set to a np array containing pseudo ints with the costs associated with the instances
    y = np.empty(len(y_normal), dtype=PseudoInt)
    for index, value in y_normal.iteritems():
        new_int = PseudoInt(value)
        new_int.cost = x_amounts.loc[index]  # Here the cost is added to the label
        y[index] = new_int

    # Normal train test split
    x_train, x_test, y_train, y_test = sklearn.model_selection.train_test_split(x, y, test_size=0.2)

    # Classifier
    clf = sklearn.tree.DecisionTreeClassifier()

    # Custom scorer with the cost function below (lower cost is better)
    cost_scorer = make_scorer(cost_function, greater_is_better=False)  # Custom cost function (Lower cost is better)

    # Define pipeline
    pipe = Pipeline([('clf', clf)])

    # Grid search grid with any hyper parameters or other settings
    param_grid = [
        {'sfs__estimator__criterion': ['gini', 'entropy']}
    ]

    # Grid search and pass the custom scorer function
    gs = GridSearchCV(estimator=pipe,
                      param_grid=param_grid,
                      scoring=cost_scorer,
                      n_jobs=1,
                      cv=5,
                      refit=True)

    # run grid search and refit with best hyper parameters
    gs = gs.fit(x_train.as_matrix(), y_train)
    print("Best Parameters: " + str(gs.best_params_))
    print('Best Accuracy: ' + str(gs.best_score_))

    # Predict with retrained model (with best parameters)
    y_test_pred = gs.predict(x_test.as_matrix())

    # Get scores (also cost score)
    get_scores(y_test, y_test_pred)


def get_scores(y_test, y_test_pred):
    print("Getting scores")

    print("SCORES")
    precision = sklearn.metrics.precision_score(y_test, y_test_pred)
    recall = sklearn.metrics.recall_score(y_test, y_test_pred)
    f1_score = sklearn.metrics.f1_score(y_test, y_test_pred)
    accuracy = sklearn.metrics.accuracy_score(y_test, y_test_pred)
    print("Precision      " + str(precision))
    print("Recall         " + str(recall))
    print("Accuracy       " + str(accuracy))
    print("F1_Score       " + str(f1_score))

    print("COST")
    cost = cost_function(y_test, y_test_pred)
    print("Cost Savings   " + str(-cost))

    print("CONFUSION MATRIX")
    cnf_matrix = sklearn.metrics.confusion_matrix(y_test, y_test_pred)
    cnf_matrix = cnf_matrix.astype('float') / cnf_matrix.sum(axis=1)[:, np.newaxis]
    print(cnf_matrix)


def cost_function(y_test, y_test_pred):
    """
    Calculates total cost based on TP, FP, TN, FN and the cost of a certain instance
    :param y_test: Has to be an array of PseudoInts containing the cost of each instance
    :param y_test_pred: Any array of PseudoInts or ints
    :return: Returns total cost
    """
    cost = 0

    for index in range(len(y_test)):
        # print(index)
        y = y_test[index]
        y_pred = y_test_pred[index]
        x_amt = y.cost

        if y == 0 and y_pred == 0:
            cost -= x_amt # Reducing cot by x_amt
        elif y == 0 and y_pred == 1:
            cost += x_amt  # Wrong classification adds cost
        elif y == 1 and y_pred == 0:
            cost += x_amt + 5 # Wrong classification adds cost and fee
        elif y == 1 and y_pred == 1:
            cost += 0  # No cost
        else:
            raise ValueError("No cost could be assigned to the instance: " + str(index))

    # print("Cost: " + str(cost))
    return cost

更新

我现在没有直接更改包中的文件(这有点脏),而是添加到项目的第一行导入行中:

import sklearn.utils.multiclass

def return_binary(y):
    return "binary"

sklearn.utils.multiclass.type_of_target = return_binary

这会将 sklearn.utils.multiclass 中的 type_of_tartget(y) 方法覆盖为始终 return 二进制。请注意,他必须在所有其他 sklearn-imports 的前面。