PySpark 中的自定义评估器
Custom Evaluator in PySpark
我想使用排名指标 (MAP@k) 优化 PySpark 管道的超参数。我在文档中看到了如何使用 Evaluation (Scala) 中定义的指标,但我需要定义一个自定义评估器 class,因为 MAP@k 尚未实现。所以我需要做类似的事情:
model = Pipeline(stages=[indexer, assembler, scaler, lg])
paramGrid_lg = ParamGridBuilder() \
.addGrid(lg.regParam, [0.001, 0.1]) \
.addGrid(lg.elasticNetParam, [0, 1]) \
.build()
crossval_lg = CrossValidator(estimator=model,
estimatorParamMaps=paramGrid_lg,
evaluator=MAPkEvaluator(),
numFolds=2)
其中 MAPkEvaluator()
是我的自定义评估器。我看过 但没有答案。
是否有可用的示例或文档?有谁知道是否可以在 PySpark 中实现它?我应该实现什么方法?
@jarandaf 在第一条评论中回答了这个问题,但为了清楚起见,我写了如何使用随机指标实现一个基本示例:
import random
from pyspark.ml.evaluation import Evaluator
class RandomEvaluator(Evaluator):
def __init__(self, predictionCol="prediction", labelCol="label"):
self.predictionCol = predictionCol
self.labelCol = labelCol
def _evaluate(self, dataset):
"""
Returns a random number.
Implement here the true metric
"""
return random.randint(0,1)
def isLargerBetter(self):
return True
现在下面的代码应该可以工作了:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
paramGrid_lg = ParamGridBuilder() \
.addGrid(lg.regParam, [0.01, 0.1]) \
.addGrid(lg.elasticNetParam, [0, 1]) \
.build()
crossval_lg = CrossValidator(estimator=model,
estimatorParamMaps=paramGrid_lg,
evaluator= RandomEvaluator(),
numFolds=2)
cvModel = crossval_lg.fit(train_val_data_)
@Amanda 很好地回答了这个问题,但让我告诉你一些应该避免的事情。如果您查看 Evaluator()
class 的帮助,请执行以下操作:
help(Evaluator())
你会看到那里定义了一个方法:
isLargerBetter(self)
| Indicates whether the metric returned by :py:meth:`evaluate` should be maximized
| (True, default) or minimized (False).
| A given evaluator may support multiple metrics which may be maximized or minimized.
|
| .. versionadded:: 1.5.0
现在,如果您的指标需要最小化,您需要将此方法设置为:
def isLargerBetter(self):
return False
当前方法的默认值为True
。
为@Amanda 的明确答案添加一个实际示例,以下代码可用于创建自定义 Evaulator
,它计算二进制分类任务中的 F1-score。它可能没有优化(我实际上不知道是否有更有效的方法来实现指标),但它完成了工作。
import pyspark.sql.functions as F
from pyspark.ml.evaluation import Evaluator
class MyEvaluator(Evaluator):
def __init__(self, predictionCol='prediction', labelCol='label'):
self.predictionCol = predictionCol
self.labelCol = labelCol
def _evaluate(self, dataset):
tp = dataset.filter((F.col(self.labelCol) == 1) & (F.col(self.predictionCol) == 1)).count()
fp = dataset.filter((F.col(self.labelCol) == 0) & (F.col(self.predictionCol) == 1)).count()
fn = dataset.filter((F.col(self.labelCol) == 1) & (F.col(self.predictionCol) == 0)).count()
f1 = (2 * tp) / (2 * tp + fp + fn)
return f1
def isLargerBetter(self):
return True
我想使用排名指标 (MAP@k) 优化 PySpark 管道的超参数。我在文档中看到了如何使用 Evaluation (Scala) 中定义的指标,但我需要定义一个自定义评估器 class,因为 MAP@k 尚未实现。所以我需要做类似的事情:
model = Pipeline(stages=[indexer, assembler, scaler, lg])
paramGrid_lg = ParamGridBuilder() \
.addGrid(lg.regParam, [0.001, 0.1]) \
.addGrid(lg.elasticNetParam, [0, 1]) \
.build()
crossval_lg = CrossValidator(estimator=model,
estimatorParamMaps=paramGrid_lg,
evaluator=MAPkEvaluator(),
numFolds=2)
其中 MAPkEvaluator()
是我的自定义评估器。我看过
是否有可用的示例或文档?有谁知道是否可以在 PySpark 中实现它?我应该实现什么方法?
@jarandaf 在第一条评论中回答了这个问题,但为了清楚起见,我写了如何使用随机指标实现一个基本示例:
import random
from pyspark.ml.evaluation import Evaluator
class RandomEvaluator(Evaluator):
def __init__(self, predictionCol="prediction", labelCol="label"):
self.predictionCol = predictionCol
self.labelCol = labelCol
def _evaluate(self, dataset):
"""
Returns a random number.
Implement here the true metric
"""
return random.randint(0,1)
def isLargerBetter(self):
return True
现在下面的代码应该可以工作了:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
paramGrid_lg = ParamGridBuilder() \
.addGrid(lg.regParam, [0.01, 0.1]) \
.addGrid(lg.elasticNetParam, [0, 1]) \
.build()
crossval_lg = CrossValidator(estimator=model,
estimatorParamMaps=paramGrid_lg,
evaluator= RandomEvaluator(),
numFolds=2)
cvModel = crossval_lg.fit(train_val_data_)
@Amanda 很好地回答了这个问题,但让我告诉你一些应该避免的事情。如果您查看 Evaluator()
class 的帮助,请执行以下操作:
help(Evaluator())
你会看到那里定义了一个方法:
isLargerBetter(self)
| Indicates whether the metric returned by :py:meth:`evaluate` should be maximized
| (True, default) or minimized (False).
| A given evaluator may support multiple metrics which may be maximized or minimized.
|
| .. versionadded:: 1.5.0
现在,如果您的指标需要最小化,您需要将此方法设置为:
def isLargerBetter(self):
return False
当前方法的默认值为True
。
为@Amanda 的明确答案添加一个实际示例,以下代码可用于创建自定义 Evaulator
,它计算二进制分类任务中的 F1-score。它可能没有优化(我实际上不知道是否有更有效的方法来实现指标),但它完成了工作。
import pyspark.sql.functions as F
from pyspark.ml.evaluation import Evaluator
class MyEvaluator(Evaluator):
def __init__(self, predictionCol='prediction', labelCol='label'):
self.predictionCol = predictionCol
self.labelCol = labelCol
def _evaluate(self, dataset):
tp = dataset.filter((F.col(self.labelCol) == 1) & (F.col(self.predictionCol) == 1)).count()
fp = dataset.filter((F.col(self.labelCol) == 0) & (F.col(self.predictionCol) == 1)).count()
fn = dataset.filter((F.col(self.labelCol) == 1) & (F.col(self.predictionCol) == 0)).count()
f1 = (2 * tp) / (2 * tp + fp + fn)
return f1
def isLargerBetter(self):
return True