如何在实现 spark fp 增长的同时获取 RDD 中的字符串值?
How to get string values in RDD while implementing spark fp growth?
我在下面的查询中成功加入了 userID
和 match
。
var queryToGroupCustomers = "SELECT yt.userID as player," +
" concat_ws(\",\", collect_set(match)) AS matchesPlayedOn" + //concat_ws()
" FROM recommendationengine.sportsbookbets_orc yt" +
" where yt.userID is not null " + leagueCondition + "'" +
" GROUP BY yt.userID"
现在我想将列传递到 RDD 中以在算法中使用。我对此的实现是使用通用行格式 val transactions: RDD[Array[String]] = results.rdd.map( row => row.get(2).toString.split(","))
,但出现以下错误;
17/03/27 23:28:51 ERROR Executor: Exception in task 3.0 in stage 1.0 (TID 29)
java.lang.ArrayIndexOutOfBoundsException: 2
at org.apache.spark.sql.catalyst.expressions.GenericRow.get(rows.scala:200)
连接数据集的示例如下;
ff6e96d4-e243-4046-8e02-ce3d4b459a5d Napoli - Crotone, AC Milan - Juventus, Torino - Juventus, AS Roma - AC Milan, Empoli - Bologna, AC Milan - Internazionale, Genoa - AC Milan, Sassuolo - Chievo Verona, Sassuolo - Genoa
到目前为止,我对该算法的完整实现如下;
// Has all customers and their bets
var queryToGroupCustomers = "SELECT yt.userID as player," +
" concat_ws(\",\", collect_set(match)) AS matchesPlayedOn" + //concat_ws()
" FROM recommendationengine.sportsbookbets_orc yt" +
" where yt.userID is not null " + leagueCondition + "'" +
" GROUP BY yt.userID"
println("Executing query: \n\n" + queryToGroupCustomers)
var results = hc.sql(queryToGroupCustomers).cache()
val transactions: RDD[Array[String]] = results.rdd.map( row => row.get(2).toString.split(","))
// Set configurations for FP-Growth
val fpg = new FPGrowth()
.setMinSupport(0.5)
.setNumPartitions(10)
// Generate model
val model = fpg.run(transactions);
println("\n\n Starting FPGrowth\n\n")
model.freqItemsets.collect().foreach { itemset =>
println(itemset.items.mkString("[", ",", "]") + ", " + itemset.freq)
}
如果有任何建议,我将不胜感激...谢谢
您有一行包含 2 个字段,row.get(2)
获取其第三个字段的值(通常,行中的字段从 0 开始);当然这是一个错误。要获得 matchesPlayedOn
,请使用 row.get(1)
或仅使用 row(1)
.
我在下面的查询中成功加入了 userID
和 match
。
var queryToGroupCustomers = "SELECT yt.userID as player," +
" concat_ws(\",\", collect_set(match)) AS matchesPlayedOn" + //concat_ws()
" FROM recommendationengine.sportsbookbets_orc yt" +
" where yt.userID is not null " + leagueCondition + "'" +
" GROUP BY yt.userID"
现在我想将列传递到 RDD 中以在算法中使用。我对此的实现是使用通用行格式 val transactions: RDD[Array[String]] = results.rdd.map( row => row.get(2).toString.split(","))
,但出现以下错误;
17/03/27 23:28:51 ERROR Executor: Exception in task 3.0 in stage 1.0 (TID 29)
java.lang.ArrayIndexOutOfBoundsException: 2
at org.apache.spark.sql.catalyst.expressions.GenericRow.get(rows.scala:200)
连接数据集的示例如下;
ff6e96d4-e243-4046-8e02-ce3d4b459a5d Napoli - Crotone, AC Milan - Juventus, Torino - Juventus, AS Roma - AC Milan, Empoli - Bologna, AC Milan - Internazionale, Genoa - AC Milan, Sassuolo - Chievo Verona, Sassuolo - Genoa
到目前为止,我对该算法的完整实现如下;
// Has all customers and their bets
var queryToGroupCustomers = "SELECT yt.userID as player," +
" concat_ws(\",\", collect_set(match)) AS matchesPlayedOn" + //concat_ws()
" FROM recommendationengine.sportsbookbets_orc yt" +
" where yt.userID is not null " + leagueCondition + "'" +
" GROUP BY yt.userID"
println("Executing query: \n\n" + queryToGroupCustomers)
var results = hc.sql(queryToGroupCustomers).cache()
val transactions: RDD[Array[String]] = results.rdd.map( row => row.get(2).toString.split(","))
// Set configurations for FP-Growth
val fpg = new FPGrowth()
.setMinSupport(0.5)
.setNumPartitions(10)
// Generate model
val model = fpg.run(transactions);
println("\n\n Starting FPGrowth\n\n")
model.freqItemsets.collect().foreach { itemset =>
println(itemset.items.mkString("[", ",", "]") + ", " + itemset.freq)
}
如果有任何建议,我将不胜感激...谢谢
您有一行包含 2 个字段,row.get(2)
获取其第三个字段的值(通常,行中的字段从 0 开始);当然这是一个错误。要获得 matchesPlayedOn
,请使用 row.get(1)
或仅使用 row(1)
.