在 flink global window 中重新处理未更改的元素,并进行连接转换

Unchanged elements reprocessed in flink global window, with a join transformation

Flink 转换(合并)流中有元素正在重新处理,即使它们没有被修改。

假设我们有 3 个元素:1、2 和 3。当插入它们时,会发生这种情况:

在最后一次插入中,1 或 2 没有任何变化,因此没有理由重新处理它们。

重新处理规则:

join后正在使用全局window,如下图:

            bookStream
                .join(publisherStream)
                .where(book -> book.publisherId)
                .equalTo(publisher -> publisher.id)
                .window(GlobalWindows.create())
                .trigger(new ForeverTrigger<>())
                .apply(new JoinFunction<Book, Publisher, Book_Publisher>() {
                    @Override
                    public Book_Publisher join(Book book, Publisher publisher) throws Exception {
                        return new Book_Publisher(book, publisher);
                    }
                })

ForeverTrigger 实现:

public class ForeverTrigger<T, E extends Window> extends Trigger<T, E> {

    @Override
    public TriggerResult onElement(T element, long timestamp, E window, TriggerContext ctx) throws Exception {
        return TriggerResult.FIRE;
    }

    @Override
    public TriggerResult onProcessingTime(long time, E window, TriggerContext ctx) throws Exception {
        return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onEventTime(long time, E window, TriggerContext ctx) throws Exception {
        return TriggerResult.CONTINUE;
    }

    @Override
    public void clear(E window, TriggerContext ctx) throws Exception {}
}

对于这个用例,需要一直存储所有元素,因为如果一本书更新了,我们需要有相应的出版商加入,反之亦然。因此从 bookStreampublisherStream 中删除元素不是一个选项。

一种解决方案是使用 TableAPI,如此处所述:。这会起作用,然后可以转换为数据流。但是,我想避免将 table API 用法与数据流 API 用法混合使用,特别是因为主要项目目标是通用化和自动化 flink 管道的创建,这意味着有将是两个 API 来概括而不是一个。所以,如果有不同的高效解决方案,那就太好了。

另一种解决方案是驱逐或过滤元素,如上面相同的 post 链接中所述,但这似乎效率低下,因为它仍然需要处理元素,以便 evict/filter 它们。这将需要保留先前状态的列表并比较传入的元素。

理想情况下,Flink 知道只处理包含更改的元素。是否有有效的解决方案来执行此与数据流的连接并仅处理修改后的元素?

窗口联接在设计时并未考虑到这种情况。为了有效地处理这个问题,我认为您需要在 API 堆栈中下降一个级别并使用 KeyedCoProcessFunctions,或者上升一个级别并使用 Table API.