机器学习算法的 Flink HBase 输入
Flink HBase input for machine learning algorithms
我想使用 Flink-HBase 插件读取数据,然后作为 Flink 机器学习算法(分别为 SVM 和 MLR)的输入。现在我先把提取出来的数据写到一个临时文件中,然后通过libSVM的方法读入,但我想应该有更复杂的方法。
您有代码片段或想法吗?
无需将数据写入磁盘再用MLUtils.readLibSVM
读取。原因如下。
MLUtils.readLibSVM
需要一个文本文件,其中每一行都是具有相关标签的稀疏特征向量。它使用以下格式表示标签-特征向量对:
<line> .=. <label> <feature>:<value> <feature>:<value> ... <feature>:<value> # <info>
其中<feature>
是后面value
在特征向量中的索引。 MLUtils.readLibSVM
可以读取这种格式的文件并在 LabeledVector
实例中转换每一行。因此,您在读取 libSVM 文件后获得 DataSet[LabeledVector]
。这正是 SVM
和 MultipleLinearRegression
预测器所需的输入格式。
但是,根据您从 HBase 获取的数据格式,您首先必须将数据转换为 libSVM
格式。否则,MLUtils.readLibSVM
将无法读取写入的文件。而如果你转换数据,那么你也可以直接将你的数据转换成一个 DataSet[LabeledVector]
并作为 Flink 的 ML 算法的输入。这避免了不必要的磁盘循环。
如果您从 HBase 获得一个 DataSet[String]
,其中每个字符串都具有 libSVM
格式(请参阅上面的规范),那么您可以在 HBase DataSet
具有以下地图功能。
val hbaseInput: DataSet[String] = ...
val labelCOODS = hbaseInput.flatMap {
line =>
// remove all comments which start with a '#'
val commentFreeLine = line.takeWhile(_ != '#').trim
if(commentFreeLine.nonEmpty) {
val splits = commentFreeLine.split(' ')
val label = splits.head.toDouble
val sparseFeatures = splits.tail
val coos = sparseFeatures.map {
str =>
val pair = str.split(':')
require(
pair.length == 2,
"Each feature entry has to have the form <feature>:<value>")
// libSVM index is 1-based, but we expect it to be 0-based
val index = pair(0).toInt - 1
val value = pair(1).toDouble
(index, value)
}
Some((label, coos))
} else {
None
}
// Calculate maximum dimension of vectors
val dimensionDS = labelCOODS.map {
labelCOO =>
labelCOO._2.map( _._1 + 1 ).max
}.reduce(scala.math.max(_, _))
val labeledVectors: DataSet[LabeledVector] =
labelCOODS.map{ new RichMapFunction[(Double, Array[(Int, Double)]), LabeledVector] {
var dimension = 0
override def open(configuration: Configuration): Unit = {
dimension = getRuntimeContext.getBroadcastVariable(DIMENSION).get(0)
}
override def map(value: (Double, Array[(Int, Double)])): LabeledVector = {
new LabeledVector(value._1, SparseVector.fromCOO(dimension, value._2))
}
}}.withBroadcastSet(dimensionDS, DIMENSION)
这会将你的libSVM格式数据转换成LabeledVectors
的数据集。
我想使用 Flink-HBase 插件读取数据,然后作为 Flink 机器学习算法(分别为 SVM 和 MLR)的输入。现在我先把提取出来的数据写到一个临时文件中,然后通过libSVM的方法读入,但我想应该有更复杂的方法。
您有代码片段或想法吗?
无需将数据写入磁盘再用MLUtils.readLibSVM
读取。原因如下。
MLUtils.readLibSVM
需要一个文本文件,其中每一行都是具有相关标签的稀疏特征向量。它使用以下格式表示标签-特征向量对:
<line> .=. <label> <feature>:<value> <feature>:<value> ... <feature>:<value> # <info>
其中<feature>
是后面value
在特征向量中的索引。 MLUtils.readLibSVM
可以读取这种格式的文件并在 LabeledVector
实例中转换每一行。因此,您在读取 libSVM 文件后获得 DataSet[LabeledVector]
。这正是 SVM
和 MultipleLinearRegression
预测器所需的输入格式。
但是,根据您从 HBase 获取的数据格式,您首先必须将数据转换为 libSVM
格式。否则,MLUtils.readLibSVM
将无法读取写入的文件。而如果你转换数据,那么你也可以直接将你的数据转换成一个 DataSet[LabeledVector]
并作为 Flink 的 ML 算法的输入。这避免了不必要的磁盘循环。
如果您从 HBase 获得一个 DataSet[String]
,其中每个字符串都具有 libSVM
格式(请参阅上面的规范),那么您可以在 HBase DataSet
具有以下地图功能。
val hbaseInput: DataSet[String] = ...
val labelCOODS = hbaseInput.flatMap {
line =>
// remove all comments which start with a '#'
val commentFreeLine = line.takeWhile(_ != '#').trim
if(commentFreeLine.nonEmpty) {
val splits = commentFreeLine.split(' ')
val label = splits.head.toDouble
val sparseFeatures = splits.tail
val coos = sparseFeatures.map {
str =>
val pair = str.split(':')
require(
pair.length == 2,
"Each feature entry has to have the form <feature>:<value>")
// libSVM index is 1-based, but we expect it to be 0-based
val index = pair(0).toInt - 1
val value = pair(1).toDouble
(index, value)
}
Some((label, coos))
} else {
None
}
// Calculate maximum dimension of vectors
val dimensionDS = labelCOODS.map {
labelCOO =>
labelCOO._2.map( _._1 + 1 ).max
}.reduce(scala.math.max(_, _))
val labeledVectors: DataSet[LabeledVector] =
labelCOODS.map{ new RichMapFunction[(Double, Array[(Int, Double)]), LabeledVector] {
var dimension = 0
override def open(configuration: Configuration): Unit = {
dimension = getRuntimeContext.getBroadcastVariable(DIMENSION).get(0)
}
override def map(value: (Double, Array[(Int, Double)])): LabeledVector = {
new LabeledVector(value._1, SparseVector.fromCOO(dimension, value._2))
}
}}.withBroadcastSet(dimensionDS, DIMENSION)
这会将你的libSVM格式数据转换成LabeledVectors
的数据集。