Apache Flink - 流数据的 svm 预测
Apache Flink - svm predictions on streaming data
我正在使用 Apache Flink 预测来自 Twitter 的流。
代码在 Scala 中实现
我的问题是,我从数据集 API 训练的 SVM 模型需要一个数据集作为 predict()-方法的输入。
我在这里已经看到一个问题,其中一位用户说,您需要编写一个自己的 MapFunction,它在作业开始时读取模型(参考:)
但是我不能write/understand这个代码。
即使我在 StreamingMapFunction 中获取模型。我仍然需要一个数据集作为参数来预测结果。
我真的希望有人能show/explain告诉我这是怎么做到的。
Flink-版本:1.9
Scala 版本:2.11
Flink-ML:2.11
val strEnv = StreamExecutionEnvironment.getExecutionEnvironment
val env = ExecutionEnvironment.getExecutionEnvironment
//this is my Model including all the terms to calculate the tfidf-values and to create a libsvm
val featureVectorService = new FeatureVectorService
featureVectorService.learnTrainingData(labeledData, false)
//reads the created libsvm
val trainingData: DataSet[LabeledVector] = MLUtils.readLibSVM(env, "...")
val svm = SVM()
.setBlocks(env.getParallelism)
.setIterations(100)
.setRegularization(0.001)
.setStepsize(0.1)
.setSeed(42)
//learning
svm.fit(trainingData)
//this is my twitter stream - text should be predicted later
val streamSource: DataStream[String] = strEnv.addSource(new TwitterSource(params.getProperties))
//the texts i want to transform to tfidf using the service upon and give it the svm to predict
val tweets: DataStream[(String, String)] = streamSource
.flatMap(new SelectEnglishTweetWithCreatedAtFlatMapper)
因此,目前 SVM
所属的 FlinkML 不支持流 API。这就是 SVM
只接受 DataSet
的原因。这个想法不是使用 FlinkML,而是使用 scala 或 java 中可用的一些 SVM 库。然后你可以读取模型,例如从文件中读取。问题是你必须自己实现大部分逻辑。
您提到的 post 中的评论或多或少说的是完全相同的事情。
我正在使用 Apache Flink 预测来自 Twitter 的流。
代码在 Scala 中实现
我的问题是,我从数据集 API 训练的 SVM 模型需要一个数据集作为 predict()-方法的输入。
我在这里已经看到一个问题,其中一位用户说,您需要编写一个自己的 MapFunction,它在作业开始时读取模型(参考:
但是我不能write/understand这个代码。
即使我在 StreamingMapFunction 中获取模型。我仍然需要一个数据集作为参数来预测结果。
我真的希望有人能show/explain告诉我这是怎么做到的。
Flink-版本:1.9 Scala 版本:2.11 Flink-ML:2.11
val strEnv = StreamExecutionEnvironment.getExecutionEnvironment
val env = ExecutionEnvironment.getExecutionEnvironment
//this is my Model including all the terms to calculate the tfidf-values and to create a libsvm
val featureVectorService = new FeatureVectorService
featureVectorService.learnTrainingData(labeledData, false)
//reads the created libsvm
val trainingData: DataSet[LabeledVector] = MLUtils.readLibSVM(env, "...")
val svm = SVM()
.setBlocks(env.getParallelism)
.setIterations(100)
.setRegularization(0.001)
.setStepsize(0.1)
.setSeed(42)
//learning
svm.fit(trainingData)
//this is my twitter stream - text should be predicted later
val streamSource: DataStream[String] = strEnv.addSource(new TwitterSource(params.getProperties))
//the texts i want to transform to tfidf using the service upon and give it the svm to predict
val tweets: DataStream[(String, String)] = streamSource
.flatMap(new SelectEnglishTweetWithCreatedAtFlatMapper)
因此,目前 SVM
所属的 FlinkML 不支持流 API。这就是 SVM
只接受 DataSet
的原因。这个想法不是使用 FlinkML,而是使用 scala 或 java 中可用的一些 SVM 库。然后你可以读取模型,例如从文件中读取。问题是你必须自己实现大部分逻辑。
您提到的 post 中的评论或多或少说的是完全相同的事情。