PySpark 中的自定义评估器

Custom Evaluator in PySpark

我想使用排名指标 (MAP@k) 优化 PySpark 管道的超参数。我在文档中看到了如何使用 Evaluation (Scala) 中定义的指标,但我需要定义一个自定义评估器 class,因为 MAP@k 尚未实现。所以我需要做类似的事情:

model = Pipeline(stages=[indexer, assembler, scaler, lg])
paramGrid_lg = ParamGridBuilder() \
    .addGrid(lg.regParam, [0.001, 0.1]) \
    .addGrid(lg.elasticNetParam, [0, 1]) \
    .build()

crossval_lg = CrossValidator(estimator=model,
                      estimatorParamMaps=paramGrid_lg,
                      evaluator=MAPkEvaluator(), 
                      numFolds=2)

其中 MAPkEvaluator() 是我的自定义评估器。我看过 但没有答案。

是否有可用的示例或文档?有谁知道是否可以在 PySpark 中实现它?我应该实现什么方法?

@jarandaf 在第一条评论中回答了这个问题,但为了清楚起见,我写了如何使用随机指标实现一个基本示例:

import random
from pyspark.ml.evaluation import Evaluator

class RandomEvaluator(Evaluator):

    def __init__(self, predictionCol="prediction", labelCol="label"):
        self.predictionCol = predictionCol
        self.labelCol = labelCol

    def _evaluate(self, dataset):
        """
        Returns a random number. 
        Implement here the true metric
        """
        return random.randint(0,1)

    def isLargerBetter(self):
        return True

现在下面的代码应该可以工作了:

from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

paramGrid_lg = ParamGridBuilder() \
    .addGrid(lg.regParam, [0.01, 0.1]) \
    .addGrid(lg.elasticNetParam, [0, 1]) \
    .build()

crossval_lg = CrossValidator(estimator=model,
                      estimatorParamMaps=paramGrid_lg,
                      evaluator= RandomEvaluator(), 
                      numFolds=2)

cvModel = crossval_lg.fit(train_val_data_)

@Amanda 很好地回答了这个问题,但让我告诉你一些应该避免的事情。如果您查看 Evaluator() class 的帮助,请执行以下操作:

help(Evaluator())

你会看到那里定义了一个方法:

isLargerBetter(self)
 |      Indicates whether the metric returned by :py:meth:`evaluate` should be maximized
 |      (True, default) or minimized (False).
 |      A given evaluator may support multiple metrics which may be maximized or minimized.
 |      
 |      .. versionadded:: 1.5.0

现在,如果您的指标需要最小化,您需要将此方法设置为:

def isLargerBetter(self):
        return False

当前方法的默认值为True

为@Amanda 的明确答案添加一个实际示例,以下代码可用于创建自定义 Evaulator,它计算二进制分类任务中的 F1-score。它可能没有优化(我实际上不知道是否有更有效的方法来实现指标),但它完成了工作。

import pyspark.sql.functions as F
from pyspark.ml.evaluation import Evaluator

class MyEvaluator(Evaluator):

    def __init__(self, predictionCol='prediction', labelCol='label'):
        self.predictionCol = predictionCol
        self.labelCol = labelCol

    def _evaluate(self, dataset):
        tp = dataset.filter((F.col(self.labelCol) == 1) & (F.col(self.predictionCol) == 1)).count()
        fp = dataset.filter((F.col(self.labelCol) == 0) & (F.col(self.predictionCol) == 1)).count()
        fn = dataset.filter((F.col(self.labelCol) == 1) & (F.col(self.predictionCol) == 0)).count()
        f1 = (2 * tp) / (2 * tp + fp + fn)
        return f1

    def isLargerBetter(self):
        return True