pyspark:交叉验证器不起作用

pyspark: CrossValidator not work

我正在尝试调整 ALS 的参数,但总是选择第一个参数作为最佳选项

from pyspark.sql import SQLContext
from pyspark import SparkConf, SparkContext
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator
from math import sqrt

from operator import add

conf = (SparkConf()
         .setMaster("local[4]")
         .setAppName("Myapp")
         .set("spark.executor.memory", "2g"))
sc = SparkContext(conf = conf)

sqlContext = SQLContext(sc)
def computeRmse(data):
    return (sqrt(data.map(lambda x: (x[2] - x[3]) ** 2).reduce(add) / float(data.count())))

dfRatings = sqlContext.createDataFrame([(0, 0, 4.0), (0, 1, 2.0), (1, 1, 3.0), (1, 2, 4.0), (2, 1, 1.0), (2, 2, 5.0)],
                                 ["user", "item", "rating"])

lr1 = ALS()
grid1 = ParamGridBuilder().addGrid(lr1.regParam, [1.0,0.005,2.0]).build()
evaluator1 = RegressionEvaluator(predictionCol=lr1.getPredictionCol(),labelCol=lr1.getRatingCol(), metricName='rmse')
cv1 = CrossValidator(estimator=lr1, estimatorParamMaps=grid1, evaluator=evaluator1, numFolds=2)
cvModel1 = cv1.fit(dfRatings)
a=cvModel1.transform(dfRatings)
print ('rmse with cross validation: {}'.format(computeRmse(a)))

for reg_param in (1.0,0.005,2.0):
    lr = ALS(regParam=reg_param)
    model = lr.fit(dfRatings)
    print ('reg_param: {}, rmse: {}'.format(reg_param,computeRmse(model.transform(dfRatings))))

输出:
rmse 交叉验证:1.1820489116858794
reg_param:1.0,rmse:1.1820489116858794
reg_param:0.005,rmse:0.001573816765686575
reg_param: 2.0, rmse: 2.1056964491942787

有什么帮助吗?

提前致谢,

在您的 CrossValidator 中,您将折叠数固定为 1。但是,参数 numFolds must be >=2。仅使用一次失败分离成训练集和测试集的想法。

撇开其他问题不谈,您只是没有使用足够的数据来执行有意义的交叉验证和评估。正如我在 中解释和说明的那样,当训练集中缺少用户或项目时,ALS 无法提供预测。

这意味着交叉验证期间的每个拆分都会有未定义的预测,并且整体评估将是未定义的。因此 CrossValidator 将 return 第一个可能的模型,因为从它的角度来看,你训练的所有模型都同样糟糕。

我实施了一个 Pipeline 解决方案,我在管道的最后阶段添加了一个自定义转换器,这样 nan 预测就会被丢弃。请注意,此实现适用于 Spark < 2.2.0,因为未引入关键字 coldStartStrategy。因此,如果您使用的是 Spark==2.2.0,则不需要额外的阶段。

首先,我介绍应用 nan 滴的自定义转换器。

from pyspark.ml import Transformer

class DropNAPredictions(Transformer):
    def _transform(self, predictedDF):
        nonNullDF = predictedDF.dropna(subset=['prediction', ])
        predictionDF = nonNullDF.withColumn('prediction', nonNullDF['prediction'].cast('double'))
        return predictionDF

现在我可以构建我的管道并使用交叉验证进行训练:

dropna = DropNAPredictions()

als = ALS(maxIter=10, userCol="player", itemCol="item", ratingCol="rating", implicitPrefs=False)

pipeline = Pipeline(stages=[als, dropna])
paramGrid = ParamGridBuilder().addGrid(als.regParam, [0.1, 0.05]) \
    .addGrid(als.rank, [1, 3]) \
    .build()

cv = CrossValidator(estimator=pipeline,
                    estimatorParamMaps=paramGrid,
                    evaluator=RegressionEvaluator(labelCol="rating"),
                    numFolds=3)

cvModel = cv.fit(training)

关于持久性的说明:由于自定义转换器,管道无法保存。有一个 post 讨论了用于序列化自定义转换器的选项,但我还没有深入研究解决方案。作为临时解决方案,您可以仅序列化 ALS 模型本身,然后通过将自定义转换器添加到管道来重建管道。

bestPipeline = cvModel.bestModel
bestModel = bestPipeline.stages[0]  # extracts the ALS model
bestModel.save("s2s_als_stage")

from pyspark.ml.pipeline import PipelineModel
from pyspark.ml.recommendation import ALSModel

mymodel = ALSModel.load('s2s_als_stage')
pipeline = PipelineModel(stages=[mymodel, dropna])  # dropna is the custom transformer
pred_test = pipeline.transform(test)  # score test data