使用 scala 在 Flink 中进行实时流预测
Real-Time streaming prediction in Flink using scala
Flink 版本:1.2.0
Scala 版本:2.11.8
我想使用 DataStream 来预测使用 Scala 的 Flink 中的模型。
我在使用 scala 的 flink 中有一个 DataStream[String],它包含来自 kafka json 格式的数据 source.I 想使用这个数据流来预测已经训练过的 Flink-ml 模型。
问题是所有的 flink-ml 示例都使用 DataSet api 来预测。
我对 flink 和 scala 比较陌生,所以任何以代码解决方案形式提供的帮助都将不胜感激。
输入:
{"FC196":"Dormant","FC174":"Yolo","FC195":"Lol","FC176":"4","FC198":"BANKING","FC175":"ABDULMAJEED","FC197":"2017/04/04","FC178":"1","FC177":"CBS","FC199":"INDIVIDUAL","FC179":"SYSTEM","FC190":"OK","FC192":"osName","FC191":"Completed","FC194":"125","FC193":"7","FC203":"A10SBPUB000000000004439900053570","FC205":"1","FC185":"20","FC184":"Transfer","FC187":"2","FC186":"2121","FC189":"abcdef","FC200":"","FC188":"BR01","FC202":"INDIVIDUAL","FC201":"","FC181":"7:00PM","FC180":"2007/04/01","FC183":"11000000","FC182":"INR"}
代码:
package org.apache.flink.quickstart
//imports
import java.util.Properties
import org.apache.flink.api.scala._
import org.apache.flink.ml.recommendation.ALS
import org.apache.flink.ml.regression.MultipleLinearRegression
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import scala.util.parsing.json.JSON
//kafka consumer imports
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
//kafka json table imports
import org.apache.flink.table.examples.scala.StreamTableExample
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.streaming.connectors.kafka.Kafka09JsonTableSource
import org.apache.flink.api.java.DataSet
//JSon4s imports
import org.json4s.native.JsonMethods
// Case class
case class CC(FC196:String,FC174:String,FC195:String,FC176:String,FC198:String,FC175:String,FC197:String,FC178:String,FC177:String,FC199:String,FC179:String,FC190:String,FC192:String,FC191:String,FC194:String,FC193:String,FC203:String,FC205:String,FC185:String,FC184:String,FC187:String,FC186:String,FC189:String,FC200:String,FC188:String,FC202:String,FC201:String,FC181:String,FC180:String,FC183:String,FC182:String)
object WordCount {
implicit val formats = org.json4s.DefaultFormats
def main(args: Array[String]) {
// set up the execution environment
implicit lazy val formats = org.json4s.DefaultFormats
// kafka properties
val properties = new Properties()
properties.setProperty("bootstrap.servers", "***.**.*.***:9093")
properties.setProperty("zookeeper.connect", "***.**.*.***:2181")
properties.setProperty("group.id","grouop")
properties.setProperty("auto.offset.reset", "earliest")
val env = StreamExecutionEnvironment.getExecutionEnvironment
// val tableEnv = TableEnvironment.getTableEnvironment(env)
val st = env
.addSource(new FlinkKafkaConsumer09("new", new SimpleStringSchema() , properties))
.flatMap(raw => JsonMethods.parse(raw).toOption)
val mapped = st.map(_.extract[CC])
mapped.print()
env.execute()
}
}
解决这个问题的方法是编写一个 MapFunction
,它在作业开始时读取模型。 MapFunction
然后将模型存储为其内部状态的一部分。这样它会在失败的情况下自动恢复:
public static void main(String[] args) throws Exception {
// obtain execution environment, run this example in "ingestion time"
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
DataStream<Value> input = ...; // read from Kafka for example
DataStream<Prediction> prediction = input.map(new Predictor());
prediction.print();
env.execute();
}
public static class Predictor implements MapFunction<Value, Prediction>, CheckpointedFunction {
private transient ListState<Model> modelState;
private transient Model model;
@Override
public Prediction map(Value value) throws Exception {
return model.predict(value);
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
// we don't have to do anything here because we assume the model to be constant
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
ListStateDescriptor<Model> listStateDescriptor = new ListStateDescriptor<>("model", Model.class);
modelState = context.getOperatorStateStore().getUnionListState(listStateDescriptor);
if (context.isRestored()) {
// restore the model from state
model = modelState.get().iterator().next();
} else {
modelState.clear();
// read the model from somewhere, e.g. read from a file
model = ...;
// update the modelState so that it is checkpointed from now
modelState.add(model);
}
}
}
public static class Model {}
public static class Value{}
public static class Prediction{}
}
Flink 版本:1.2.0
Scala 版本:2.11.8
我想使用 DataStream 来预测使用 Scala 的 Flink 中的模型。 我在使用 scala 的 flink 中有一个 DataStream[String],它包含来自 kafka json 格式的数据 source.I 想使用这个数据流来预测已经训练过的 Flink-ml 模型。 问题是所有的 flink-ml 示例都使用 DataSet api 来预测。 我对 flink 和 scala 比较陌生,所以任何以代码解决方案形式提供的帮助都将不胜感激。
输入:
{"FC196":"Dormant","FC174":"Yolo","FC195":"Lol","FC176":"4","FC198":"BANKING","FC175":"ABDULMAJEED","FC197":"2017/04/04","FC178":"1","FC177":"CBS","FC199":"INDIVIDUAL","FC179":"SYSTEM","FC190":"OK","FC192":"osName","FC191":"Completed","FC194":"125","FC193":"7","FC203":"A10SBPUB000000000004439900053570","FC205":"1","FC185":"20","FC184":"Transfer","FC187":"2","FC186":"2121","FC189":"abcdef","FC200":"","FC188":"BR01","FC202":"INDIVIDUAL","FC201":"","FC181":"7:00PM","FC180":"2007/04/01","FC183":"11000000","FC182":"INR"}
代码:
package org.apache.flink.quickstart
//imports
import java.util.Properties
import org.apache.flink.api.scala._
import org.apache.flink.ml.recommendation.ALS
import org.apache.flink.ml.regression.MultipleLinearRegression
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import scala.util.parsing.json.JSON
//kafka consumer imports
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
//kafka json table imports
import org.apache.flink.table.examples.scala.StreamTableExample
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.streaming.connectors.kafka.Kafka09JsonTableSource
import org.apache.flink.api.java.DataSet
//JSon4s imports
import org.json4s.native.JsonMethods
// Case class
case class CC(FC196:String,FC174:String,FC195:String,FC176:String,FC198:String,FC175:String,FC197:String,FC178:String,FC177:String,FC199:String,FC179:String,FC190:String,FC192:String,FC191:String,FC194:String,FC193:String,FC203:String,FC205:String,FC185:String,FC184:String,FC187:String,FC186:String,FC189:String,FC200:String,FC188:String,FC202:String,FC201:String,FC181:String,FC180:String,FC183:String,FC182:String)
object WordCount {
implicit val formats = org.json4s.DefaultFormats
def main(args: Array[String]) {
// set up the execution environment
implicit lazy val formats = org.json4s.DefaultFormats
// kafka properties
val properties = new Properties()
properties.setProperty("bootstrap.servers", "***.**.*.***:9093")
properties.setProperty("zookeeper.connect", "***.**.*.***:2181")
properties.setProperty("group.id","grouop")
properties.setProperty("auto.offset.reset", "earliest")
val env = StreamExecutionEnvironment.getExecutionEnvironment
// val tableEnv = TableEnvironment.getTableEnvironment(env)
val st = env
.addSource(new FlinkKafkaConsumer09("new", new SimpleStringSchema() , properties))
.flatMap(raw => JsonMethods.parse(raw).toOption)
val mapped = st.map(_.extract[CC])
mapped.print()
env.execute()
}
}
解决这个问题的方法是编写一个 MapFunction
,它在作业开始时读取模型。 MapFunction
然后将模型存储为其内部状态的一部分。这样它会在失败的情况下自动恢复:
public static void main(String[] args) throws Exception {
// obtain execution environment, run this example in "ingestion time"
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
DataStream<Value> input = ...; // read from Kafka for example
DataStream<Prediction> prediction = input.map(new Predictor());
prediction.print();
env.execute();
}
public static class Predictor implements MapFunction<Value, Prediction>, CheckpointedFunction {
private transient ListState<Model> modelState;
private transient Model model;
@Override
public Prediction map(Value value) throws Exception {
return model.predict(value);
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
// we don't have to do anything here because we assume the model to be constant
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
ListStateDescriptor<Model> listStateDescriptor = new ListStateDescriptor<>("model", Model.class);
modelState = context.getOperatorStateStore().getUnionListState(listStateDescriptor);
if (context.isRestored()) {
// restore the model from state
model = modelState.get().iterator().next();
} else {
modelState.clear();
// read the model from somewhere, e.g. read from a file
model = ...;
// update the modelState so that it is checkpointed from now
modelState.add(model);
}
}
}
public static class Model {}
public static class Value{}
public static class Prediction{}
}