如何将 Sparks MatrixFactorizationModel recommendProductsForUsers 保存到 Hbase
How to save Sparks MatrixFactorizationModel recommendProductsForUsers to Hbase
我是 spark 的新手,我想将 recommendProductsForUsers 的输出保存到 Hbase table。我找到了一个示例 (https://sparkkb.wordpress.com/2015/05/04/save-javardd-to-hbase-using-saveasnewapihadoopdataset-spark-api-java-coding/),显示使用 JavaPairRDD 和 saveAsNewAPIHadoopDataset 进行保存。
如何将 JavaRDD<Tuple2<Object, Rating[]>>
转换为 JavaPairRDD<ImmutableBytesWritable, Put>
以便我可以使用 saveAsNewAPIHadoopDataset?
//Loads the data from hdfs
MatrixFactorizationModel sameModel = MatrixFactorizationModel.load(jsc.sc(), trainedDataPath);
//Get recommendations for all users
JavaRDD<Tuple2<Object, Rating[]>> ratings3 = sameModel.recommendProductsForUsers(noOfProductsToReturn).toJavaRDD();
通过使用 mapToPair。来自您提供示例的同一来源(我手动更改了类型):
JavaPairRDD<ImmutableBytesWritable, Put> hbasePuts = javaRDD.mapToPair(
new PairFunction<Tuple2<Object, Rating[]>, ImmutableBytesWritable, Put>() {
@Override
public Tuple2<ImmutableBytesWritable, Put> call(Tuple2<Object, Rating[]> row) throws Exception {
Put put = new Put(Bytes.toBytes(row.getString(0)));
put.add(Bytes.toBytes("columFamily"), Bytes.toBytes("columnQualifier1"), Bytes.toBytes(row.getString(1)));
put.add(Bytes.toBytes("columFamily"), Bytes.toBytes("columnQualifier2"), Bytes.toBytes(row.getString(2)));
return new Tuple2<ImmutableBytesWritable, Put>(new ImmutableBytesWritable(), put);
}
});
它是这样的,你创建一个新的 put 实例,在构造函数中为其提供行键,然后为每一列调用 add。然后你 return 创建了看跌期权。
这就是我解决上述问题的方法,希望这对某人有所帮助。
JavaPairRDD<ImmutableBytesWritable, Put> hbasePuts1 = ratings3
.mapToPair(new PairFunction<Tuple2<Object, Rating[]>, ImmutableBytesWritable, Put>() {
@Override
public Tuple2<ImmutableBytesWritable, Put> call(Tuple2<Object, Rating[]> arg0)
throws Exception {
Rating[] userAndProducts = arg0._2;
System.out.println("***********" + userAndProducts.length + "**************");
List<Item> items = new ArrayList<Item>();
Put put = null
String recommendedProduct = "";
for (Rating r : userAndProducts) {
//Some logic here to convert Ratings into appropriate put command
// recommendedProduct = r.product;
}
put.addColumn(Bytes.toBytes("recommendation"), Bytes.toBytes("product"),Bytes.toBytes(recommendedProduct)); Bytes.toBytes("product"),Bytes.toBytes(response.getItems().toString()));
return new Tuple2<ImmutableBytesWritable, Put>(new ImmutableBytesWritable(), put);
}
});
System.out.println("*********** Number of records in JavaPairRdd: "+ hbasePuts1.count() +"**************");
hbasePuts1.saveAsNewAPIHadoopDataset(newApiJobConfig.getConfiguration());
jsc.stop();
我们刚刚开源了 Splice Machine,我们有将 MLIB 与查询和存储集成到 Splice Machine 中的示例。我不知道这是否有帮助,但我想我会让你知道。
http://community.splicemachine.com/use-spark-libraries-splice-machine/
感谢post,太棒了。
我是 spark 的新手,我想将 recommendProductsForUsers 的输出保存到 Hbase table。我找到了一个示例 (https://sparkkb.wordpress.com/2015/05/04/save-javardd-to-hbase-using-saveasnewapihadoopdataset-spark-api-java-coding/),显示使用 JavaPairRDD 和 saveAsNewAPIHadoopDataset 进行保存。
如何将 JavaRDD<Tuple2<Object, Rating[]>>
转换为 JavaPairRDD<ImmutableBytesWritable, Put>
以便我可以使用 saveAsNewAPIHadoopDataset?
//Loads the data from hdfs
MatrixFactorizationModel sameModel = MatrixFactorizationModel.load(jsc.sc(), trainedDataPath);
//Get recommendations for all users
JavaRDD<Tuple2<Object, Rating[]>> ratings3 = sameModel.recommendProductsForUsers(noOfProductsToReturn).toJavaRDD();
通过使用 mapToPair。来自您提供示例的同一来源(我手动更改了类型):
JavaPairRDD<ImmutableBytesWritable, Put> hbasePuts = javaRDD.mapToPair(
new PairFunction<Tuple2<Object, Rating[]>, ImmutableBytesWritable, Put>() {
@Override
public Tuple2<ImmutableBytesWritable, Put> call(Tuple2<Object, Rating[]> row) throws Exception {
Put put = new Put(Bytes.toBytes(row.getString(0)));
put.add(Bytes.toBytes("columFamily"), Bytes.toBytes("columnQualifier1"), Bytes.toBytes(row.getString(1)));
put.add(Bytes.toBytes("columFamily"), Bytes.toBytes("columnQualifier2"), Bytes.toBytes(row.getString(2)));
return new Tuple2<ImmutableBytesWritable, Put>(new ImmutableBytesWritable(), put);
}
});
它是这样的,你创建一个新的 put 实例,在构造函数中为其提供行键,然后为每一列调用 add。然后你 return 创建了看跌期权。
这就是我解决上述问题的方法,希望这对某人有所帮助。
JavaPairRDD<ImmutableBytesWritable, Put> hbasePuts1 = ratings3
.mapToPair(new PairFunction<Tuple2<Object, Rating[]>, ImmutableBytesWritable, Put>() {
@Override
public Tuple2<ImmutableBytesWritable, Put> call(Tuple2<Object, Rating[]> arg0)
throws Exception {
Rating[] userAndProducts = arg0._2;
System.out.println("***********" + userAndProducts.length + "**************");
List<Item> items = new ArrayList<Item>();
Put put = null
String recommendedProduct = "";
for (Rating r : userAndProducts) {
//Some logic here to convert Ratings into appropriate put command
// recommendedProduct = r.product;
}
put.addColumn(Bytes.toBytes("recommendation"), Bytes.toBytes("product"),Bytes.toBytes(recommendedProduct)); Bytes.toBytes("product"),Bytes.toBytes(response.getItems().toString()));
return new Tuple2<ImmutableBytesWritable, Put>(new ImmutableBytesWritable(), put);
}
});
System.out.println("*********** Number of records in JavaPairRdd: "+ hbasePuts1.count() +"**************");
hbasePuts1.saveAsNewAPIHadoopDataset(newApiJobConfig.getConfiguration());
jsc.stop();
我们刚刚开源了 Splice Machine,我们有将 MLIB 与查询和存储集成到 Splice Machine 中的示例。我不知道这是否有帮助,但我想我会让你知道。
http://community.splicemachine.com/use-spark-libraries-splice-machine/
感谢post,太棒了。