从 DataFrame 中的值构建 SparseVector

Building a SparseVector out of values in a DataFrame

我一直在尝试从由 Double 组成的 1 列 Spark Dataframe 中提取信息,并将其放入 Breeze SparseVector 中。为此,我检查了 1 列 DataFrame 的每个元素,将其强制为 Double,然后将其添加到 VectorBuilder。我的 VectorBuilder 在 foreach 循环中正确地改变其状态,然后在循环结束后清除所有更改。为什么会这样?有解决方法吗?

编辑 1:

我 运行 在本地使用 1 个内核;它不在集群上

代码:

val accum = sc.accumulator(0, "Counter") 

def correlate() : Unit = {

  val cols = df.columns
  val id = cols(0)       
  val id2 = cols(1)

  //id1 and id2 are there for 
  val df1 = sqlContext.sql(s"SELECT ${id} FROM dataset WHERE (${id} IS NOT NULL AND ${id2} IS NOT NULL)")

  /* df1 is a dataframe that has 1 column*/   
  df1.show();
  accum.value_=(0);

  /******************** Problem starts here **********************/
  val builder = new VectorBuilder[Double](5)
  df1.foreach{ x =>
    x(0) match{             
      case d : Double => 
        builder.add(accum.value, d); 
        //This print statement prints out correct values
        println(s"index: ${accum.value} value: ${builder(accum.value)}")      
        accum.value += 1;
        println(s"builder's active size in loop: ${builder.activeSize}")  
      case _ => throw new ClassCastException("Pattern-Matching for Double failed");
    } 
  }
  //temp becomes empty at this point
  println(s"builder's active size out of loop: ${builder.activeSize}")

  val sparse = builder.toSparseVector     
  sparse.foreachPair{(i,v) => println(s"index: ${i} and value: ${v}")}
}
this.correlate()

输出:

+-------+
|   RowX|
+-------+
|  145.0|
|   -1.0|
|-212.21|
|   23.3|
|   21.4|
+-------+

index: 0 value: 145.0
builder's active size in loop: 1
index: 1 value: -1.0
builder's active size in loop: 2
index: 2 value: -212.21
builder's active size in loop: 3
index: 3 value: 23.3
builder's active size in loop: 4
index: 4 value: 21.4
builder's active size in loop: 5

//the loop ends here and builder's state disappears

builder's active size out of loop: 0
index: 0 and value: 0.0
index: 1 and value: 0.0
index: 2 and value: 0.0
index: 3 and value: 0.0
index: 4 and value: 0.0

它为每个工人添加了构建器的本地副本。获取本地对象收集:

SparseVector(df1.rdd.map(_.getDouble(0)).collect)