'The implementation of the StreamExecutionEnvironment is not serializable' 的 Flink 错误
Flink error with 'The implementation of the StreamExecutionEnvironment is not serializable'
我是Flink初学者,尝试用Flink去运行推荐算法之一的LFM,但是在运行ning的时候我的代码出现了如下错误。我试图找到并修改它们,但没有解决。谁能告诉我为什么遇到问题?
这是我的主要例外
The implementation of the StreamExecutionEnvironment is not serializable
还有我的代码
注意到 sourceDataStream
来自我的自定义源扩展 RichFunction<Tuple3<>>
//training model
for (int iter = 0; iter < iterations; iter++) {
sourceDataStream
// the exception appears here
.process(new ProcessFunction<Tuple3<String, String, Double>, Object>() {
@Override
public void processElement(Tuple3<String, String, Double> in,
ProcessFunction<Tuple3<String, String, Double>, Object>.Context context,
Collector<Object> collector) throws Exception {
Double hat_rui = predict(in.f0, in.f1, qbiTable, pbuTable, streamTableEnvironment);
Double err_ui = in.f2 - hat_rui;
Table pbuSelectTable = pbuTable.select($("buValue"), $("pList")).where($("userId").isEqual(in.f0));
Table qbiSelectTable = qbiTable.select($("biValue"), $("qList")).where($("itemId").isEqual(in.f1));
DataStream<Tuple2<Boolean, Tuple2<Double, List<Double>>>> pbuSelectDataStream = streamTableEnvironment.toRetractStream(pbuSelectTable, new TupleTypeInfo<>(Types.BOOLEAN, Types.DOUBLE, Types.LIST(Types.DOUBLE)));
pbuSelectDataStream.process(new ProcessFunction<Tuple2<Boolean, Tuple2<Double, List<Double>>>, Object>() {
@Override
public void processElement(Tuple2<Boolean, Tuple2<Double, List<Double>>> userTuple,
ProcessFunction<Tuple2<Boolean, Tuple2<Double, List<Double>>>, Object>.Context context,
Collector<Object> collector) throws Exception {
DataStream<Tuple2<Boolean, Tuple2<Double, List<Double>>>> qbiSelectDataStream = streamTableEnvironment.toRetractStream(qbiSelectTable, new TupleTypeInfo<>(Types.BOOLEAN, Types.DOUBLE, Types.LIST(Types.DOUBLE)));
qbiSelectDataStream.process(new ProcessFunction<Tuple2<Boolean, Tuple2<Double, List<Double>>>, Object>() {
@Override
public void processElement(Tuple2<Boolean, Tuple2<Double, List<Double>>> itemTuple, ProcessFunction<Tuple2<Boolean, Tuple2<Double, List<Double>>>, Object>.Context context, Collector<Object> collector) throws Exception {
double bu = userTuple.f1.f0 + (alpha * (err_ui - lambd * userTuple.f1.f0));
double bi = itemTuple.f1.f0 + (alpha * (err_ui - lambd * itemTuple.f1.f0));
List<Double> pList = new ArrayList<>();
List<Double> qList = new ArrayList<>();
for (int fIter = 0; fIter < F; fIter++) {
Double pValueLast = userTuple.f1.f1.get(fIter);
Double qValueLast = itemTuple.f1.f1.get(fIter);
Double qValueNew = qValueLast + (alpha * (err_ui * pValueLast - lambd * qValueLast));
Double pValueNew = pValueLast + (alpha * (err_ui * qValueLast - lambd * pValueLast));
pList.add(pValueNew);
qList.add(qValueNew);
}
streamTableEnvironment.executeSql("INSERT OVERWRITE qbiTable VALUES ('qList', " + qList + "), ('biValue', " + bi + ")");
streamTableEnvironment.executeSql("INSERT OVERWRITE pbuTable VALUES ('pList', " + pList + "), ('buValue', " + bu + ")");
}
});
}
});
}
});
}
关于此的一些事情是行不通的:
在任何用户函数(例如 ProcessFunction
)的实现中,您不能有 DataStream
、Table
、StreamExecutionEnvironment
或其他ProcessFunction
。您所能做的就是对传入的流记录做出反应,可以选择使用您在该函数内根据先前处理的记录建立的状态。
DataStream
和 Table
API 围绕构建器范例进行组织,您将使用该范例描述流式数据流管道。这个管道必须是有向无环图:它可以分裂和合并,但必须从源流到汇,没有任何循环。该管道的各个阶段(例如 ProcessFunction
)必须编码为独立的块——它们无法到达自身之外以访问来自其他管道阶段的数据。
此范例不太适合训练机器学习模型的目的(因为训练涉及 iterating/looping)。如果那是你的 objective,也许可以看看 https://github.com/apache/flink-ml。
我是Flink初学者,尝试用Flink去运行推荐算法之一的LFM,但是在运行ning的时候我的代码出现了如下错误。我试图找到并修改它们,但没有解决。谁能告诉我为什么遇到问题?
这是我的主要例外
The implementation of the StreamExecutionEnvironment is not serializable
还有我的代码
注意到 sourceDataStream
来自我的自定义源扩展 RichFunction<Tuple3<>>
//training model
for (int iter = 0; iter < iterations; iter++) {
sourceDataStream
// the exception appears here
.process(new ProcessFunction<Tuple3<String, String, Double>, Object>() {
@Override
public void processElement(Tuple3<String, String, Double> in,
ProcessFunction<Tuple3<String, String, Double>, Object>.Context context,
Collector<Object> collector) throws Exception {
Double hat_rui = predict(in.f0, in.f1, qbiTable, pbuTable, streamTableEnvironment);
Double err_ui = in.f2 - hat_rui;
Table pbuSelectTable = pbuTable.select($("buValue"), $("pList")).where($("userId").isEqual(in.f0));
Table qbiSelectTable = qbiTable.select($("biValue"), $("qList")).where($("itemId").isEqual(in.f1));
DataStream<Tuple2<Boolean, Tuple2<Double, List<Double>>>> pbuSelectDataStream = streamTableEnvironment.toRetractStream(pbuSelectTable, new TupleTypeInfo<>(Types.BOOLEAN, Types.DOUBLE, Types.LIST(Types.DOUBLE)));
pbuSelectDataStream.process(new ProcessFunction<Tuple2<Boolean, Tuple2<Double, List<Double>>>, Object>() {
@Override
public void processElement(Tuple2<Boolean, Tuple2<Double, List<Double>>> userTuple,
ProcessFunction<Tuple2<Boolean, Tuple2<Double, List<Double>>>, Object>.Context context,
Collector<Object> collector) throws Exception {
DataStream<Tuple2<Boolean, Tuple2<Double, List<Double>>>> qbiSelectDataStream = streamTableEnvironment.toRetractStream(qbiSelectTable, new TupleTypeInfo<>(Types.BOOLEAN, Types.DOUBLE, Types.LIST(Types.DOUBLE)));
qbiSelectDataStream.process(new ProcessFunction<Tuple2<Boolean, Tuple2<Double, List<Double>>>, Object>() {
@Override
public void processElement(Tuple2<Boolean, Tuple2<Double, List<Double>>> itemTuple, ProcessFunction<Tuple2<Boolean, Tuple2<Double, List<Double>>>, Object>.Context context, Collector<Object> collector) throws Exception {
double bu = userTuple.f1.f0 + (alpha * (err_ui - lambd * userTuple.f1.f0));
double bi = itemTuple.f1.f0 + (alpha * (err_ui - lambd * itemTuple.f1.f0));
List<Double> pList = new ArrayList<>();
List<Double> qList = new ArrayList<>();
for (int fIter = 0; fIter < F; fIter++) {
Double pValueLast = userTuple.f1.f1.get(fIter);
Double qValueLast = itemTuple.f1.f1.get(fIter);
Double qValueNew = qValueLast + (alpha * (err_ui * pValueLast - lambd * qValueLast));
Double pValueNew = pValueLast + (alpha * (err_ui * qValueLast - lambd * pValueLast));
pList.add(pValueNew);
qList.add(qValueNew);
}
streamTableEnvironment.executeSql("INSERT OVERWRITE qbiTable VALUES ('qList', " + qList + "), ('biValue', " + bi + ")");
streamTableEnvironment.executeSql("INSERT OVERWRITE pbuTable VALUES ('pList', " + pList + "), ('buValue', " + bu + ")");
}
});
}
});
}
});
}
关于此的一些事情是行不通的:
在任何用户函数(例如 ProcessFunction
)的实现中,您不能有 DataStream
、Table
、StreamExecutionEnvironment
或其他ProcessFunction
。您所能做的就是对传入的流记录做出反应,可以选择使用您在该函数内根据先前处理的记录建立的状态。
DataStream
和 Table
API 围绕构建器范例进行组织,您将使用该范例描述流式数据流管道。这个管道必须是有向无环图:它可以分裂和合并,但必须从源流到汇,没有任何循环。该管道的各个阶段(例如 ProcessFunction
)必须编码为独立的块——它们无法到达自身之外以访问来自其他管道阶段的数据。
此范例不太适合训练机器学习模型的目的(因为训练涉及 iterating/looping)。如果那是你的 objective,也许可以看看 https://github.com/apache/flink-ml。