对预排序输入进行 Spark 特征向量转换
Spark Feature Vector Transformation on Pre-Sorted Input
我在 HDFS 上的制表符分隔文件中有一些数据,如下所示:
label | user_id | feature
------------------------------
pos | 111 | www.abc.com
pos | 111 | www.xyz.com
pos | 111 | Firefox
pos | 222 | www.example.com
pos | 222 | www.xyz.com
pos | 222 | IE
neg | 333 | www.jkl.com
neg | 333 | www.xyz.com
neg | 333 | Chrome
我需要对其进行转换,为每个 user_id 创建一个特征向量来训练 org.apache.spark.ml.classification.NaiveBayes
模型。
我目前的方法基本上如下:
- 将原始数据加载到 DataFrame 中
- 使用 StringIndexer 索引特征
- 向下找到 RDD 并按 user_id 分组并将特征索引映射到稀疏向量中。
关键在于...数据已由 user_id 预先排序。利用它的最佳方法是什么?一想到可能发生了多少不必要的工作,我就很痛苦。
如果一些代码有助于理解我当前的方法,这里是地图的本质:
val featurization = (vals: (String,Iterable[Row])) => {
// create a Seq of all the feature indices
// Note: the indexing was done in a previous step not shown
val seq = vals._2.map(x => (x.getDouble(1).toInt,1.0D)).toSeq
// create the sparse vector
val featureVector = Vectors.sparse(maxIndex, seq)
// convert the string label into a Double
val label = if (vals._2.head.getString(2) == "pos") 1.0 else 0.0
(label, vals._1, featureVector)
}
d.rdd
.groupBy(_.getString(1))
.map(featurization)
.toDF("label","user_id","features")
让我们从 your other question
开始
If my data on disk is guaranteed to be pre-sorted by the key which will be used for a group aggregation or reduce, is there any way for Spark to take advantage of that?
视情况而定。如果您应用的操作可以从 map-side 聚合中获益,那么您可以通过预排序数据获得很多好处,而无需进一步干预您的代码。共享相同密钥的数据应该位于相同的分区上,并且可以在随机播放之前在本地聚合。
不幸的是,在这种特殊情况下它不会有太大帮助。即使您启用地图端聚合(groupBy(Key)
不使用 is,因此您需要自定义实现)或聚合特征向量(您会在我对 的回答中找到一些示例)收获不大。您可以在这里和那里节省一些工作,但您仍然必须在节点之间传输所有索引。
如果你想获得更多,你将不得不做更多的工作。我可以看到两种可以利用现有订单的基本方法:
使用自定义 Hadoop 输入格式只生成完整的记录(标签、id、所有特征),而不是逐行读取数据。如果您的数据每个 id 的行数固定,您甚至可以尝试使用 NLineInputFormat
并在之后应用 mapPartitions
来聚合记录。
这绝对是更冗长的解决方案,但不需要在 Spark 中进行额外的改组。
照常读取数据,但对 groupBy
使用自定义分区程序。据我所知,使用 rangePartitioner
应该可以正常工作,但请确保您可以尝试以下过程:
- 使用
mapPartitionsWithIndex
查找每个分区的最小/最大 ID。
- 创建分区器,在当前 (i-th) 分区上保持最小值 <= ids < 最大值,并将最大值推送到分区 i + 1
- 将此分区程序用于
groupBy(Key)
这可能是更友好的解决方案,但至少需要一些改组。如果预期要移动的记录数很低 (<< #records-per-partition),您甚至可以使用 mapPartitions
和 broadcast
* 处理此问题而无需随机播放,尽管分区可能更有用且更便宜在实践中。
* 您可以使用与此类似的方法:
我在 HDFS 上的制表符分隔文件中有一些数据,如下所示:
label | user_id | feature
------------------------------
pos | 111 | www.abc.com
pos | 111 | www.xyz.com
pos | 111 | Firefox
pos | 222 | www.example.com
pos | 222 | www.xyz.com
pos | 222 | IE
neg | 333 | www.jkl.com
neg | 333 | www.xyz.com
neg | 333 | Chrome
我需要对其进行转换,为每个 user_id 创建一个特征向量来训练 org.apache.spark.ml.classification.NaiveBayes
模型。
我目前的方法基本上如下:
- 将原始数据加载到 DataFrame 中
- 使用 StringIndexer 索引特征
- 向下找到 RDD 并按 user_id 分组并将特征索引映射到稀疏向量中。
关键在于...数据已由 user_id 预先排序。利用它的最佳方法是什么?一想到可能发生了多少不必要的工作,我就很痛苦。
如果一些代码有助于理解我当前的方法,这里是地图的本质:
val featurization = (vals: (String,Iterable[Row])) => {
// create a Seq of all the feature indices
// Note: the indexing was done in a previous step not shown
val seq = vals._2.map(x => (x.getDouble(1).toInt,1.0D)).toSeq
// create the sparse vector
val featureVector = Vectors.sparse(maxIndex, seq)
// convert the string label into a Double
val label = if (vals._2.head.getString(2) == "pos") 1.0 else 0.0
(label, vals._1, featureVector)
}
d.rdd
.groupBy(_.getString(1))
.map(featurization)
.toDF("label","user_id","features")
让我们从 your other question
开始If my data on disk is guaranteed to be pre-sorted by the key which will be used for a group aggregation or reduce, is there any way for Spark to take advantage of that?
视情况而定。如果您应用的操作可以从 map-side 聚合中获益,那么您可以通过预排序数据获得很多好处,而无需进一步干预您的代码。共享相同密钥的数据应该位于相同的分区上,并且可以在随机播放之前在本地聚合。
不幸的是,在这种特殊情况下它不会有太大帮助。即使您启用地图端聚合(groupBy(Key)
不使用 is,因此您需要自定义实现)或聚合特征向量(您会在我对
如果你想获得更多,你将不得不做更多的工作。我可以看到两种可以利用现有订单的基本方法:
使用自定义 Hadoop 输入格式只生成完整的记录(标签、id、所有特征),而不是逐行读取数据。如果您的数据每个 id 的行数固定,您甚至可以尝试使用
NLineInputFormat
并在之后应用mapPartitions
来聚合记录。这绝对是更冗长的解决方案,但不需要在 Spark 中进行额外的改组。
照常读取数据,但对
groupBy
使用自定义分区程序。据我所知,使用rangePartitioner
应该可以正常工作,但请确保您可以尝试以下过程:- 使用
mapPartitionsWithIndex
查找每个分区的最小/最大 ID。 - 创建分区器,在当前 (i-th) 分区上保持最小值 <= ids < 最大值,并将最大值推送到分区 i + 1
- 将此分区程序用于
groupBy(Key)
这可能是更友好的解决方案,但至少需要一些改组。如果预期要移动的记录数很低 (<< #records-per-partition),您甚至可以使用
mapPartitions
和broadcast
* 处理此问题而无需随机播放,尽管分区可能更有用且更便宜在实践中。- 使用
* 您可以使用与此类似的方法: