使用 groupByKey 组合值以与 FPGrowth 一起使用

Using groupByKey to combine values for using with FPGrowth

我有 User、Item 形式的文件,我想将其与 Spark Itemsets 一起使用。我已经这样做了:

val data = sc.textFile("myfile")
             .map(line => (line.trim.split(' ')(0), line.trim.split(' ')(1)))
             .groupByKey()
val fpg = new FPGrowth().setMinSupport(0.2).setNumPartitions(10)
val model = fpg.run(data)

但它抱怨

inferred type arguments [Nothing,(String, Iterable[String])] do not conform to method run's type parameter bounds [Item,Basket <: Iterable[Item]]

Basket 必须是 java.lang.Iterable 所以 Tuple2 或 Scala Iterable 都不能在这里工作。在将数据传递给 run 方法之前,只需放下键并将篮子转换为 Array

val data = sc.parallelize(Seq("1 a", "1 b", "2 b", "2 c"))
  .map(_.split(" ") match {
    case Array(id, item, _*) => (id, item)
  })
  .groupByKey()
  .values  // Take only values
  .map(_.toArray)  // Convert to Array

val fpg = new FPGrowth().setMinSupport(0.2).setNumPartitions(10)
val model = fpg.run(data)