'The implementation of the StreamExecutionEnvironment is not serializable' 的 Flink 错误

Flink error with 'The implementation of the StreamExecutionEnvironment is not serializable'

我是Flink初学者,尝试用Flink去运行推荐算法之一的LFM,但是在运行ning的时候我的代码出现了如下错误。我试图找到并修改它们,但没有解决。谁能告诉我为什么遇到问题?

这是我的主要例外

The implementation of the StreamExecutionEnvironment is not serializable

还有我的代码

注意到 sourceDataStream 来自我的自定义源扩展 RichFunction<Tuple3<>>

//training model
for (int iter = 0; iter < iterations; iter++) {
    sourceDataStream
        // the exception appears here
        .process(new ProcessFunction<Tuple3<String, String, Double>, Object>() {
            @Override
            public void processElement(Tuple3<String, String, Double> in,
                                                   ProcessFunction<Tuple3<String, String, Double>, Object>.Context context,
                                                   Collector<Object> collector) throws Exception {

                Double hat_rui = predict(in.f0, in.f1, qbiTable, pbuTable, streamTableEnvironment);
                Double err_ui = in.f2 - hat_rui;
                Table pbuSelectTable = pbuTable.select($("buValue"), $("pList")).where($("userId").isEqual(in.f0));
                Table qbiSelectTable = qbiTable.select($("biValue"), $("qList")).where($("itemId").isEqual(in.f1));

                DataStream<Tuple2<Boolean, Tuple2<Double, List<Double>>>> pbuSelectDataStream = streamTableEnvironment.toRetractStream(pbuSelectTable, new TupleTypeInfo<>(Types.BOOLEAN, Types.DOUBLE, Types.LIST(Types.DOUBLE)));
                pbuSelectDataStream.process(new ProcessFunction<Tuple2<Boolean, Tuple2<Double, List<Double>>>, Object>() {

                @Override
                public void processElement(Tuple2<Boolean, Tuple2<Double, List<Double>>> userTuple,
                                                           ProcessFunction<Tuple2<Boolean, Tuple2<Double, List<Double>>>, Object>.Context context,
                                                           Collector<Object> collector) throws Exception {

                    DataStream<Tuple2<Boolean, Tuple2<Double, List<Double>>>> qbiSelectDataStream = streamTableEnvironment.toRetractStream(qbiSelectTable, new TupleTypeInfo<>(Types.BOOLEAN, Types.DOUBLE, Types.LIST(Types.DOUBLE)));
                    qbiSelectDataStream.process(new ProcessFunction<Tuple2<Boolean, Tuple2<Double, List<Double>>>, Object>() {

                        @Override
                        public void processElement(Tuple2<Boolean, Tuple2<Double, List<Double>>> itemTuple, ProcessFunction<Tuple2<Boolean, Tuple2<Double, List<Double>>>, Object>.Context context, Collector<Object> collector) throws Exception {

                            double bu = userTuple.f1.f0 + (alpha * (err_ui - lambd * userTuple.f1.f0));
                            double bi = itemTuple.f1.f0 + (alpha * (err_ui - lambd * itemTuple.f1.f0));

                            List<Double> pList = new ArrayList<>();
                            List<Double> qList = new ArrayList<>();
                            for (int fIter = 0; fIter < F; fIter++) {
                                Double pValueLast = userTuple.f1.f1.get(fIter);
                                Double qValueLast = itemTuple.f1.f1.get(fIter);
                                Double qValueNew = qValueLast + (alpha * (err_ui * pValueLast - lambd * qValueLast));
                                Double pValueNew = pValueLast + (alpha * (err_ui * qValueLast - lambd * pValueLast));
                                pList.add(pValueNew);
                                qList.add(qValueNew);
                            }
                            streamTableEnvironment.executeSql("INSERT OVERWRITE qbiTable VALUES ('qList', " + qList + "), ('biValue', " + bi + ")");
                            streamTableEnvironment.executeSql("INSERT OVERWRITE pbuTable VALUES ('pList', " + pList + "), ('buValue', " + bu + ")");
                        }
                    });
                }
            });
        }
    });
}

关于此的一些事情是行不通的:

在任何用户函数(例如 ProcessFunction)的实现中,您不能有 DataStreamTableStreamExecutionEnvironment 或其他ProcessFunction。您所能做的就是对传入的流记录做出反应,可以选择使用您在该函数内根据先前处理的记录建立的状态。

DataStreamTable API 围绕构建器范例进行组织,您将使用该范例描述流式数据流管道。这个管道必须是有向无环图:它可以分裂和合并,但必须从源流到汇,没有任何循环。该管道的各个阶段(例如 ProcessFunction)必须编码为独立的块——它们无法到达自身之外以访问来自其他管道阶段的数据。

此范例不太适合训练机器学习模型的目的(因为训练涉及 iterating/looping)。如果那是你的 objective,也许可以看看 https://github.com/apache/flink-ml