如何在实现 spark fp 增长的同时获取 RDD 中的字符串值?

How to get string values in RDD while implementing spark fp growth?

我在下面的查询中成功加入了 userIDmatch

var queryToGroupCustomers = "SELECT yt.userID as player," +
  " concat_ws(\",\", collect_set(match)) AS matchesPlayedOn" + //concat_ws()
  " FROM recommendationengine.sportsbookbets_orc yt" +
  " where yt.userID is not null " + leagueCondition + "'" +
  " GROUP BY yt.userID"

现在我想将列传递到 RDD 中以在算法中使用。我对此的实现是使用通用行格式 val transactions: RDD[Array[String]] = results.rdd.map( row => row.get(2).toString.split(",")),但出现以下错误;

17/03/27 23:28:51 ERROR Executor: Exception in task 3.0 in stage 1.0 (TID 29)
java.lang.ArrayIndexOutOfBoundsException: 2
    at org.apache.spark.sql.catalyst.expressions.GenericRow.get(rows.scala:200)

连接数据集的示例如下;

ff6e96d4-e243-4046-8e02-ce3d4b459a5d    Napoli - Crotone, AC Milan - Juventus, Torino - Juventus, AS Roma - AC Milan, Empoli - Bologna, AC Milan - Internazionale, Genoa - AC Milan, Sassuolo - Chievo Verona, Sassuolo - Genoa

到目前为止,我对该算法的完整实现如下;

// Has all customers and their bets
var queryToGroupCustomers = "SELECT yt.userID as player," +
  " concat_ws(\",\", collect_set(match)) AS matchesPlayedOn" + //concat_ws()
  " FROM recommendationengine.sportsbookbets_orc yt" +
  " where yt.userID is not null " + leagueCondition + "'" +
  " GROUP BY yt.userID"

println("Executing query: \n\n" + queryToGroupCustomers)
var results = hc.sql(queryToGroupCustomers).cache()
val transactions: RDD[Array[String]] = results.rdd.map( row => row.get(2).toString.split(","))

// Set configurations for FP-Growth
val fpg = new FPGrowth()
  .setMinSupport(0.5)
  .setNumPartitions(10)

// Generate model
val model = fpg.run(transactions);

println("\n\n Starting FPGrowth\n\n")

model.freqItemsets.collect().foreach { itemset =>
  println(itemset.items.mkString("[", ",", "]") + ", " + itemset.freq)
}

如果有任何建议,我将不胜感激...谢谢

您有一行包含 2 个字段,row.get(2) 获取其第三个字段的值(通常,行中的字段从 0 开始);当然这是一个错误。要获得 matchesPlayedOn,请使用 row.get(1) 或仅使用 row(1).