Apache Flink - 预测处理
Apache Flink - Prediction Handling
我目前正在使用 Apache Flink 的 SVM-Class 来预测一些文本数据。
class 提供了一个预测函数,它以 DataSet[Vector] 作为输入并给我一个 DataSet[Prediction] 作为结果。到目前为止一切顺利。
我的问题是,我没有哪个预测属于哪个文本的上下文,我无法在 predict() 函数中插入文本以便之后使用它。
代码:
val tweets: DataSet[(SparseVector, String)] =
source.flatMap(new SelectEnglishTweetWithCreatedAtFlatMapper)
.map(tweet => (featureVectorService.transform(tweet._2))
model.predict(tweets).print
result example:
(SparseVector((462,8.73165920153676), (10844,8.508515650222549), (15656,2.931052542245018)),-1.0)
有没有办法将其他数据保留在预测旁边以将所有内容放在一起?因为没有上下文,预测对我没有帮助。
或者也许有一种方法可以只预测一个向量而不是数据集,我可以在上面的 map 函数中调用该函数。
SVM
预测器期望 Vector
的子类型作为输入。因此有两种方案可以解决这个问题:
- 创建
Vector
的子类型,其中包含推文文本作为标签。然后它将通过预测器循环。这种方法的优点是不需要额外的操作。但是,需要定义新的 类 实用程序来表示带有标签的不同矢量类型:
val env = ExecutionEnvironment.getExecutionEnvironment
val input = env.fromElements("foobar", "barfo", "test")
val vectorizedInput = input.map(word => {
val value = word.chars().sum()
new DenseVectorWithTag(Array(value), word)
})
val svm = SVM().setBlocks(env.getParallelism)
val weights = env.fromElements(DenseVector(1.0))
svm.weightsOption = Option(weights) // skipping the training here
val predictionResult: DataSet[(DenseVectorWithTag, Double)] = svm.predict(vectorizedInput)
class DenseVectorWithTag(override val data: Array[Double], tag: String)
extends DenseVector(data) {
override def toString: String = "(" + super.toString + ", " + tag + ")"
}
- 在
tweets
的向量化表示上将预测 DataSet
与输入 DataSet
相结合。这种方式的好处是我们不需要引入new类。我们为此付出的代价是额外的连接操作,这可能很昂贵:
val input = env.fromElements("foobar", "barfo", "test")
val vectorizedInput = input.map(word => {
val value = word.chars().sum()
(DenseVector(value), word)
})
val svm = SVM().setBlocks(env.getParallelism)
val weights = env.fromElements(DenseVector(1.0))
svm.weightsOption = Option(weights) // skipping the training here
val predictionResult = svm.predict(vectorizedInput.map(a => a._1))
val inputWithPrediction: DataSet[(String, Double)] = vectorizedInput
.join(predictionResult)
.where(0)
.equalTo(0)
.apply((t, p) => (t._2, p._2))
我目前正在使用 Apache Flink 的 SVM-Class 来预测一些文本数据。
class 提供了一个预测函数,它以 DataSet[Vector] 作为输入并给我一个 DataSet[Prediction] 作为结果。到目前为止一切顺利。
我的问题是,我没有哪个预测属于哪个文本的上下文,我无法在 predict() 函数中插入文本以便之后使用它。
代码:
val tweets: DataSet[(SparseVector, String)] =
source.flatMap(new SelectEnglishTweetWithCreatedAtFlatMapper)
.map(tweet => (featureVectorService.transform(tweet._2))
model.predict(tweets).print
result example:
(SparseVector((462,8.73165920153676), (10844,8.508515650222549), (15656,2.931052542245018)),-1.0)
有没有办法将其他数据保留在预测旁边以将所有内容放在一起?因为没有上下文,预测对我没有帮助。
或者也许有一种方法可以只预测一个向量而不是数据集,我可以在上面的 map 函数中调用该函数。
SVM
预测器期望 Vector
的子类型作为输入。因此有两种方案可以解决这个问题:
- 创建
Vector
的子类型,其中包含推文文本作为标签。然后它将通过预测器循环。这种方法的优点是不需要额外的操作。但是,需要定义新的 类 实用程序来表示带有标签的不同矢量类型:
val env = ExecutionEnvironment.getExecutionEnvironment
val input = env.fromElements("foobar", "barfo", "test")
val vectorizedInput = input.map(word => {
val value = word.chars().sum()
new DenseVectorWithTag(Array(value), word)
})
val svm = SVM().setBlocks(env.getParallelism)
val weights = env.fromElements(DenseVector(1.0))
svm.weightsOption = Option(weights) // skipping the training here
val predictionResult: DataSet[(DenseVectorWithTag, Double)] = svm.predict(vectorizedInput)
class DenseVectorWithTag(override val data: Array[Double], tag: String)
extends DenseVector(data) {
override def toString: String = "(" + super.toString + ", " + tag + ")"
}
- 在
tweets
的向量化表示上将预测DataSet
与输入DataSet
相结合。这种方式的好处是我们不需要引入new类。我们为此付出的代价是额外的连接操作,这可能很昂贵:
val input = env.fromElements("foobar", "barfo", "test")
val vectorizedInput = input.map(word => {
val value = word.chars().sum()
(DenseVector(value), word)
})
val svm = SVM().setBlocks(env.getParallelism)
val weights = env.fromElements(DenseVector(1.0))
svm.weightsOption = Option(weights) // skipping the training here
val predictionResult = svm.predict(vectorizedInput.map(a => a._1))
val inputWithPrediction: DataSet[(String, Double)] = vectorizedInput
.join(predictionResult)
.where(0)
.equalTo(0)
.apply((t, p) => (t._2, p._2))