Flink Datastreams Evictor
Flink Datastreams Evictor
我正在使用 Flink DataStreams 加入 2 个流(书籍流和出版商流)。我正在尝试使用 evictor
删除元素,以防它们从数据库中删除,这由变量 deleted 指示。
当我 运行 没有 evictor
的代码时它运行良好,但是当我添加 evictor
时它失败了。
DataStream<BooksWithPublishers> book_publisher = bookStream
.join(publishStream)
.where(value -> value.publisherId)
.equalTo(value -> value.id)
.window(GlobalWindows.create())
.trigger(new ForeverTrigger<>())
.evictor(new Evictor<CoGroupedStreams.TaggedUnion<Book, Publisher>, GlobalWindow>() {
@Override
public void evictBefore(Iterable<TimestampedValue<CoGroupedStreams.TaggedUnion<Book, Publisher>>> elements, int size, GlobalWindow window, EvictorContext evictorContext) {
Iterator<TimestampedValue<CoGroupedStreams.TaggedUnion<Book, Publisher>>> it = elements.iterator();
while(it.hasNext()){
CoGroupedStreams.TaggedUnion<Book, Publisher> cg = it.next().getValue();
Book book = cg.getOne();
Publisher pub = cg.getTwo();
if(book.deleted || pub.deleted){
it.remove();
}
}
}
@Override
public void evictAfter(Iterable<TimestampedValue<CoGroupedStreams.TaggedUnion<Book, Publisher>>> elements, int size, GlobalWindow window, EvictorContext evictorContext) {
}
})
.apply(new JoinFunction<Book, Publisher, BooksWithPublishers>() {
@Override
public BooksWithPublishers join(Book first, Publisher second) throws Exception {
return new BooksWithPublishers(first.id,first.releaseDate,first.title,second);
}
});
错误:
org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot serialize operator object class org.apache.flink.streaming.api.operators.SimpleUdfStreamOperatorFactory.
at org.apache.flink.streaming.api.graph.StreamConfig.setStreamOperatorFactory(StreamConfig.java:304)
at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.setVertexConfig(StreamingJobGraphGenerator.java:694)
at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createChain(StreamingJobGraphGenerator.java:438)
at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createChain(StreamingJobGraphGenerator.java:399)
at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createChain(StreamingJobGraphGenerator.java:390)
at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.setChaining(StreamingJobGraphGenerator.java:356)
at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:179)
at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:116)
at org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:908)
at org.apache.flink.client.StreamGraphTranslator.translateToJobGraph(StreamGraphTranslator.java:50)
at org.apache.flink.client.FlinkPipelineTranslationUtil.getJobGraph(FlinkPipelineTranslationUtil.java:39)
at org.apache.flink.client.deployment.executors.PipelineExecutorUtils.getJobGraph(PipelineExecutorUtils.java:56)
at org.apache.flink.client.deployment.executors.LocalExecutor.getJobGraph(LocalExecutor.java:104)
at org.apache.flink.client.deployment.executors.LocalExecutor.execute(LocalExecutor.java:82)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1905)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1796)
at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:69)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1782)
at com.x.x.pipelines.Bootstrapper.boot(Bootstrapper.java:68)
at com.x.x.Main.main(Main.java:7)
Caused by: java.io.NotSerializableException: com.x.x.pipelines.library.author.index.AuthorIndex
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:586)
at org.apache.flink.util.InstantiationUtil.writeObjectToConfig(InstantiationUtil.java:515)
at org.apache.flink.streaming.api.graph.StreamConfig.setStreamOperatorFactory(StreamConfig.java:301)
... 19 more
我已经尝试 运行使用 .evictor() 和空方法来处理代码,但它仍然给我一个错误。
为什么我不能使用 evictor()?
问题很可能是您封闭的 class(大概是 AuthorIndex)不可序列化,而您的程序正在尝试序列化它。这可以通过创建单独的 class 而不是使用匿名 class 或将方法设为静态来避免。
我正在使用 Flink DataStreams 加入 2 个流(书籍流和出版商流)。我正在尝试使用 evictor
删除元素,以防它们从数据库中删除,这由变量 deleted 指示。
当我 运行 没有 evictor
的代码时它运行良好,但是当我添加 evictor
时它失败了。
DataStream<BooksWithPublishers> book_publisher = bookStream
.join(publishStream)
.where(value -> value.publisherId)
.equalTo(value -> value.id)
.window(GlobalWindows.create())
.trigger(new ForeverTrigger<>())
.evictor(new Evictor<CoGroupedStreams.TaggedUnion<Book, Publisher>, GlobalWindow>() {
@Override
public void evictBefore(Iterable<TimestampedValue<CoGroupedStreams.TaggedUnion<Book, Publisher>>> elements, int size, GlobalWindow window, EvictorContext evictorContext) {
Iterator<TimestampedValue<CoGroupedStreams.TaggedUnion<Book, Publisher>>> it = elements.iterator();
while(it.hasNext()){
CoGroupedStreams.TaggedUnion<Book, Publisher> cg = it.next().getValue();
Book book = cg.getOne();
Publisher pub = cg.getTwo();
if(book.deleted || pub.deleted){
it.remove();
}
}
}
@Override
public void evictAfter(Iterable<TimestampedValue<CoGroupedStreams.TaggedUnion<Book, Publisher>>> elements, int size, GlobalWindow window, EvictorContext evictorContext) {
}
})
.apply(new JoinFunction<Book, Publisher, BooksWithPublishers>() {
@Override
public BooksWithPublishers join(Book first, Publisher second) throws Exception {
return new BooksWithPublishers(first.id,first.releaseDate,first.title,second);
}
});
错误:
org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot serialize operator object class org.apache.flink.streaming.api.operators.SimpleUdfStreamOperatorFactory.
at org.apache.flink.streaming.api.graph.StreamConfig.setStreamOperatorFactory(StreamConfig.java:304)
at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.setVertexConfig(StreamingJobGraphGenerator.java:694)
at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createChain(StreamingJobGraphGenerator.java:438)
at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createChain(StreamingJobGraphGenerator.java:399)
at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createChain(StreamingJobGraphGenerator.java:390)
at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.setChaining(StreamingJobGraphGenerator.java:356)
at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:179)
at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:116)
at org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:908)
at org.apache.flink.client.StreamGraphTranslator.translateToJobGraph(StreamGraphTranslator.java:50)
at org.apache.flink.client.FlinkPipelineTranslationUtil.getJobGraph(FlinkPipelineTranslationUtil.java:39)
at org.apache.flink.client.deployment.executors.PipelineExecutorUtils.getJobGraph(PipelineExecutorUtils.java:56)
at org.apache.flink.client.deployment.executors.LocalExecutor.getJobGraph(LocalExecutor.java:104)
at org.apache.flink.client.deployment.executors.LocalExecutor.execute(LocalExecutor.java:82)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1905)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1796)
at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:69)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1782)
at com.x.x.pipelines.Bootstrapper.boot(Bootstrapper.java:68)
at com.x.x.Main.main(Main.java:7)
Caused by: java.io.NotSerializableException: com.x.x.pipelines.library.author.index.AuthorIndex
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:586)
at org.apache.flink.util.InstantiationUtil.writeObjectToConfig(InstantiationUtil.java:515)
at org.apache.flink.streaming.api.graph.StreamConfig.setStreamOperatorFactory(StreamConfig.java:301)
... 19 more
我已经尝试 运行使用 .evictor() 和空方法来处理代码,但它仍然给我一个错误。
为什么我不能使用 evictor()?
问题很可能是您封闭的 class(大概是 AuthorIndex)不可序列化,而您的程序正在尝试序列化它。这可以通过创建单独的 class 而不是使用匿名 class 或将方法设为静态来避免。