SqlSessionTemplate在flink中是不可序列化的
SqlSessionTemplate is not serializable in flink
我的flink应用程序在启动时抛出这样的异常:
Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: org.apache.ibatis.binding.MapperProxy@3fe8ad3f is not serializable. The object probably contains or references non serializable fields.
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:171)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:146)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:146)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:146)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:91)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1606)
at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:178)
at org.apache.flink.streaming.api.datastream.DataStream.addSink(DataStream.java:1240)
at com.pdd.service.koduck.realtime.flink.operators.baseLayer.PlanProcessor.afterProcess(PlanProcessor.java:90)
at com.pdd.service.koduck.realtime.flink.operators.baseLayer.Processor.process(Processor.java:43)
at com.pdd.service.koduck.realtime.flink.Runner.run(Runner.java:49)
at com.pdd.service.koduck.realtime.flink.Main.main(Main.java:27)
Caused by: java.io.NotSerializableException: org.mybatis.spring.SqlSessionTemplate
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:576)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:153)
... 11 more
我认为原因是我的 SinkFunction 引用了一个 Mybatis Mapper 对象,该对象引用了一个不可序列化的 SqlSessionTemplate。
这是我的水槽功能:
public class MySinkFunction2<T> extends RichSinkFunction<List<PlanDailyTable.Row>> {
private PlanDailyDtoMapper mapper;
public MySinkFunction2(PlanDailyDtoMapper mapper) {
this.mapper = mapper;
}
@Override
public void invoke(List<PlanDailyTable.Row> value, Context context) throws Exception {
mapper.insertMultiRow(value);
}
}
如何解决这个问题?需要一些帮助
与其在构造函数中实例化 Mapper 对象,不如在 sink 的 open
方法中执行此操作,然后使 Mapper transient
.
Flink客户端调用了sink的构造函数,sink需要序列化发送给任务管理器。而接收器的打开方法在作业开始时在每个任务管理器中调用一次。
我的flink应用程序在启动时抛出这样的异常:
Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: org.apache.ibatis.binding.MapperProxy@3fe8ad3f is not serializable. The object probably contains or references non serializable fields.
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:171)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:146)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:146)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:146)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:91)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1606)
at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:178)
at org.apache.flink.streaming.api.datastream.DataStream.addSink(DataStream.java:1240)
at com.pdd.service.koduck.realtime.flink.operators.baseLayer.PlanProcessor.afterProcess(PlanProcessor.java:90)
at com.pdd.service.koduck.realtime.flink.operators.baseLayer.Processor.process(Processor.java:43)
at com.pdd.service.koduck.realtime.flink.Runner.run(Runner.java:49)
at com.pdd.service.koduck.realtime.flink.Main.main(Main.java:27)
Caused by: java.io.NotSerializableException: org.mybatis.spring.SqlSessionTemplate
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:576)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:153)
... 11 more
我认为原因是我的 SinkFunction 引用了一个 Mybatis Mapper 对象,该对象引用了一个不可序列化的 SqlSessionTemplate。
这是我的水槽功能:
public class MySinkFunction2<T> extends RichSinkFunction<List<PlanDailyTable.Row>> {
private PlanDailyDtoMapper mapper;
public MySinkFunction2(PlanDailyDtoMapper mapper) {
this.mapper = mapper;
}
@Override
public void invoke(List<PlanDailyTable.Row> value, Context context) throws Exception {
mapper.insertMultiRow(value);
}
}
如何解决这个问题?需要一些帮助
与其在构造函数中实例化 Mapper 对象,不如在 sink 的 open
方法中执行此操作,然后使 Mapper transient
.
Flink客户端调用了sink的构造函数,sink需要序列化发送给任务管理器。而接收器的打开方法在作业开始时在每个任务管理器中调用一次。