Spark ML-无法使用 MatrixFactorizationModel 加载模型
Spark ML- failing to load model using MatrixFactorizationModel
我正在尝试使用 Spark 协同过滤实现推荐系统。
首先我准备模型并保存到磁盘:
MatrixFactorizationModel model = trainModel(inputDataRdd);
model.save(jsc.sc(), "/op/tc/model/");
当我使用单独的进程加载模型时,程序失败并出现以下异常:
代码:
static JavaSparkContext jsc ;
private static Options options;
static{
SparkConf conf = new SparkConf().setAppName("TC recommender application");
conf.set("spark.driver.allowMultipleContexts", "true");
jsc= new JavaSparkContext(conf);
}
MatrixFactorizationModel model = MatrixFactorizationModel.load(jsc.sc(),
"/op/tc/model/");
异常:
Exception in thread "main" java.io.IOException: Not a file:
maprfs:/op/tc/model/data
at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:324)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:199)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1952)
at org.apache.spark.rdd.RDD$$anonfun$aggregate.apply(RDD.scala:1114)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
at org.apache.spark.rdd.RDD.aggregate(RDD.scala:1107)
at org.apache.spark.mllib.recommendation.MatrixFactorizationModel.countApproxDistinctUserProduct(MatrixFactorizationModel.scala:96)
at org.apache.spark.mllib.recommendation.MatrixFactorizationModel.predict(MatrixFactorizationModel.scala:126)
at com.aexp.cxp.recommendation.ProductRecommendationIndividual.main(ProductRecommendationIndividual.java:62)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:742)
at org.apache.spark.deploy.SparkSubmit$.doRunMain(SparkSubmit.scala:181)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
加载模型需要设置什么配置吗?任何建议都会有很大帮助。
与在任何其他分布式计算框架中一样,在 Spark 中,尝试调试代码时了解代码运行的位置非常重要。访问各种类型也很重要。例如,在 YARN 中,您将拥有:
- 如果您自己记录,主记录
- 聚合从属日志(感谢 YARN,有用的功能!)
- YARN 节点管理器(例如会告诉您容器被杀死的原因等)
- 等等
如果您没有从一开始就看对地方,那么深入研究 Spark 问题可能会非常耗时。现在更具体地讲这个问题,你有一个清晰的堆栈跟踪,但情况并非总是如此,所以你应该利用它来发挥你的优势。
堆栈跟踪的顶部是
Exception in thread "main" java.io.IOException: Not a file:
maprfs:/op/tc/model/data at
org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:324)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:199)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120) at
org.apache.spark.rdd.RDD.partitions(RDD.scala:237) at
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at
如您所见,Spark 作业在失败时正在执行 map
操作。谁执行 map
?从站,因此你必须确保你的文件在所有从站上都可用,而不仅仅是在主站上。
更一般地说,您始终需要在头脑中明确区分您为 master 编写的代码和为 slave 编写的代码。这将帮助您检测此类交互,以及对不可序列化对象的引用和此类常见错误。
我正在尝试使用 Spark 协同过滤实现推荐系统。
首先我准备模型并保存到磁盘:
MatrixFactorizationModel model = trainModel(inputDataRdd);
model.save(jsc.sc(), "/op/tc/model/");
当我使用单独的进程加载模型时,程序失败并出现以下异常:
代码:
static JavaSparkContext jsc ;
private static Options options;
static{
SparkConf conf = new SparkConf().setAppName("TC recommender application");
conf.set("spark.driver.allowMultipleContexts", "true");
jsc= new JavaSparkContext(conf);
}
MatrixFactorizationModel model = MatrixFactorizationModel.load(jsc.sc(),
"/op/tc/model/");
异常:
Exception in thread "main" java.io.IOException: Not a file: maprfs:/op/tc/model/data at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:324) at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:199) at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:239) at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:237) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:237) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:239) at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:237) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:237) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:239) at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:237) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:237) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:239) at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:237) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:237) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1952) at org.apache.spark.rdd.RDD$$anonfun$aggregate.apply(RDD.scala:1114) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) at org.apache.spark.rdd.RDD.aggregate(RDD.scala:1107) at org.apache.spark.mllib.recommendation.MatrixFactorizationModel.countApproxDistinctUserProduct(MatrixFactorizationModel.scala:96) at org.apache.spark.mllib.recommendation.MatrixFactorizationModel.predict(MatrixFactorizationModel.scala:126) at com.aexp.cxp.recommendation.ProductRecommendationIndividual.main(ProductRecommendationIndividual.java:62) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:742) at org.apache.spark.deploy.SparkSubmit$.doRunMain(SparkSubmit.scala:181) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
加载模型需要设置什么配置吗?任何建议都会有很大帮助。
与在任何其他分布式计算框架中一样,在 Spark 中,尝试调试代码时了解代码运行的位置非常重要。访问各种类型也很重要。例如,在 YARN 中,您将拥有:
- 如果您自己记录,主记录
- 聚合从属日志(感谢 YARN,有用的功能!)
- YARN 节点管理器(例如会告诉您容器被杀死的原因等)
- 等等
如果您没有从一开始就看对地方,那么深入研究 Spark 问题可能会非常耗时。现在更具体地讲这个问题,你有一个清晰的堆栈跟踪,但情况并非总是如此,所以你应该利用它来发挥你的优势。
堆栈跟踪的顶部是
Exception in thread "main" java.io.IOException: Not a file: maprfs:/op/tc/model/data at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:324) at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:199) at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:239) at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:237) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:237) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) at
如您所见,Spark 作业在失败时正在执行 map
操作。谁执行 map
?从站,因此你必须确保你的文件在所有从站上都可用,而不仅仅是在主站上。
更一般地说,您始终需要在头脑中明确区分您为 master 编写的代码和为 slave 编写的代码。这将帮助您检测此类交互,以及对不可序列化对象的引用和此类常见错误。