对预排序输入进行 Spark 特征向量转换

Spark Feature Vector Transformation on Pre-Sorted Input

我在 HDFS 上的制表符分隔文件中有一些数据,如下所示:

label | user_id | feature
------------------------------
  pos | 111     | www.abc.com
  pos | 111     | www.xyz.com
  pos | 111     | Firefox
  pos | 222     | www.example.com
  pos | 222     | www.xyz.com
  pos | 222     | IE
  neg | 333     | www.jkl.com
  neg | 333     | www.xyz.com
  neg | 333     | Chrome

我需要对其进行转换,为每个 user_id 创建一个特征向量来训练 org.apache.spark.ml.classification.NaiveBayes 模型。

我目前的方法基本上如下:

  1. 将原始数据加载到 DataFrame 中
  2. 使用 StringIndexer 索引特征
  3. 向下找到 RDD 并按 user_id 分组并将特征索引映射到稀疏向量中。

关键在于...数据已由 user_id 预先排序。利用它的最佳方法是什么?一想到可能发生了多少不必要的工作,我就很痛苦。

如果一些代码有助于理解我当前的方法,这里是地图的本质:

val featurization = (vals: (String,Iterable[Row])) => {
  // create a Seq of all the feature indices
  // Note: the indexing was done in a previous step not shown
  val seq = vals._2.map(x => (x.getDouble(1).toInt,1.0D)).toSeq

  // create the sparse vector
  val featureVector = Vectors.sparse(maxIndex, seq)

  // convert the string label into a Double
  val label = if (vals._2.head.getString(2) == "pos") 1.0 else 0.0

  (label, vals._1, featureVector)
}

d.rdd
  .groupBy(_.getString(1))
  .map(featurization)
  .toDF("label","user_id","features")

让我们从 your other question

开始

If my data on disk is guaranteed to be pre-sorted by the key which will be used for a group aggregation or reduce, is there any way for Spark to take advantage of that?

视情况而定。如果您应用的操作可以从 map-side 聚合中获益,那么您可以通过预排序数据获得很多好处,而无需进一步干预您的代码。共享相同密钥的数据应该位于相同的分区上,并且可以在随机播放之前在本地聚合。

不幸的是,在这种特殊情况下它不会有太大帮助。即使您启用地图端聚合(groupBy(Key) 不使用 is,因此您需要自定义实现)或聚合特征向量(您会在我对 的回答中找到一些示例)收获不大。您可以在这里和那里节省一些工作,但您仍然必须在节点之间传输所有索引。

如果你想获得更多,你将不得不做更多的工作。我可以看到两种可以利用现有订单的基本方法:

  1. 使用自定义 Hadoop 输入格式只生成完整的记录(标签、id、所有特征),而不是逐行读取数据。如果您的数据每个 id 的行数固定,您甚至可以尝试使用 NLineInputFormat 并在之后应用 mapPartitions 来聚合记录。

    这绝对是更冗长的解决方案,但不需要在 Spark 中进行额外的改组。

  2. 照常读取数据,但对 groupBy 使用自定义分区程序。据我所知,使用 rangePartitioner 应该可以正常工作,但请确保您可以尝试以下过程:

    • 使用mapPartitionsWithIndex 查找每个分区的最小/最大 ID。
    • 创建分区器,在当前 (i-th) 分区上保持最小值 <= ids < 最大值,并将最大值推送到分区 i + 1
    • 将此分区程序用于 groupBy(Key)

    这可能是更友好的解决方案,但至少需要一些改组。如果预期要移动的记录数很低 (<< #records-per-partition),您甚至可以使用 mapPartitionsbroadcast* 处理此问题而无需随机播放,尽管分区可能更有用且更便宜在实践中。


* 您可以使用与此类似的方法: