坚持 Apache Flink window

Persist Apache Flink window

我正在尝试使用 Flink 在流式传输中使用来自消息队列的有界数据。数据将采用以下格式:

{"id":-1,"name":"Start"}
{"id":1,"name":"Foo 1"}
{"id":2,"name":"Foo 2"}
{"id":3,"name":"Foo 3"}
{"id":4,"name":"Foo 4"}
{"id":5,"name":"Foo 5"}
...
{"id":-2,"name":"End"}

可以使用事件 ID 确定消息的开始和结束。我想接收这样的批次并将最新的(通过覆盖)批次存储在磁盘或内存中。我可以编写一个自定义 window 触发器来使用开始和结束标志提取事件,如下所示:

DataStream<Foo> fooDataStream = ...
AllWindowedStream<Foo, GlobalWindow> fooWindow = fooDataStream.windowAll(GlobalWindows.create())
.trigger(new CustomTrigger<>())
.evictor(new Evictor<Foo, GlobalWindow>() {
    @Override
    public void evictBefore(Iterable<TimestampedValue<Foo>> elements, int size, GlobalWindow window, EvictorContext evictorContext) {
        for (Iterator<TimestampedValue<Foo>> iterator = elements.iterator();
             iterator.hasNext(); ) {
            TimestampedValue<Foo> foo = iterator.next();
            if (foo.getValue().getId() < 0) {
                iterator.remove();
            }
        }
    }

    @Override
    public void evictAfter(Iterable<TimestampedValue<Foo>> elements, int size, GlobalWindow window, EvictorContext evictorContext) {

    }
});

但是我怎样才能保留最新的 window 的输出。一种方法是使用 ProcessAllWindowFunction 接收所有事件并手动将它们写入磁盘,但这感觉像是一种黑客攻击。我也在研究 Table API 与 Flink CEP 模式(像这样 question)但是找不到在每批丢弃后清除 Table 的方法上一批的事件。

有几件事妨碍了您想要的东西:

(1) Flink 的 window 运算符生成附加流,而不是更新流。它们不是为了更新以前发出的结果而设计的。 CEP 也不产生更新流。

(2) Flink的文件系统抽象不支持覆盖文件。这是因为像 S3 这样的对象存储不能很好地支持这种操作。

我认为你的选择是:

(1) 重新处理您的工作,以便它生成更新(变更日志)流。您可以使用 toChangelogStream 或使用创建更新流的 Table/SQL 操作来执行此操作,例如 GROUP BY(当它没有使用时间 window 时)。除此之外,您还需要选择支持 retractions/updates 的接收器,例如数据库。

(2) 坚持生成附加流并使用类似 FileSink 的东西将结果写入一系列滚动文件。然后在 Flink 之外做一些脚本来得到你想要的东西。