在 flink global window 中重新处理未更改的元素,并进行连接转换
Unchanged elements reprocessed in flink global window, with a join transformation
Flink 转换(合并)流中有元素正在重新处理,即使它们没有被修改。
假设我们有 3 个元素:1、2 和 3。当插入它们时,会发生这种情况:
- 当插入第一个元素1时,输出为:1
- 当插入第二个元素2时,输出为:1 -> 2(1被重新处理输出)
- 第三个元素插入:1 -> 2 -> 3(1和2重新处理)
在最后一次插入中,1 或 2 没有任何变化,因此没有理由重新处理它们。
重新处理规则:
- 仅重新处理同一出版商的图书。这意味着当插入出版商 2 的书籍时,只有出版商 2 的书籍被重新处理。我们的目标是不重新处理任何内容,因为它们不受现有新书的影响。
- 修改出版商时,只会重新处理该出版商的图书。 (没关系)
join后正在使用全局window,如下图:
bookStream
.join(publisherStream)
.where(book -> book.publisherId)
.equalTo(publisher -> publisher.id)
.window(GlobalWindows.create())
.trigger(new ForeverTrigger<>())
.apply(new JoinFunction<Book, Publisher, Book_Publisher>() {
@Override
public Book_Publisher join(Book book, Publisher publisher) throws Exception {
return new Book_Publisher(book, publisher);
}
})
ForeverTrigger 实现:
public class ForeverTrigger<T, E extends Window> extends Trigger<T, E> {
@Override
public TriggerResult onElement(T element, long timestamp, E window, TriggerContext ctx) throws Exception {
return TriggerResult.FIRE;
}
@Override
public TriggerResult onProcessingTime(long time, E window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onEventTime(long time, E window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public void clear(E window, TriggerContext ctx) throws Exception {}
}
对于这个用例,需要一直存储所有元素,因为如果一本书更新了,我们需要有相应的出版商加入,反之亦然。因此从 bookStream
或 publisherStream
中删除元素不是一个选项。
一种解决方案是使用 TableAPI,如此处所述:。这会起作用,然后可以转换为数据流。但是,我想避免将 table API 用法与数据流 API 用法混合使用,特别是因为主要项目目标是通用化和自动化 flink 管道的创建,这意味着有将是两个 API 来概括而不是一个。所以,如果有不同的高效解决方案,那就太好了。
另一种解决方案是驱逐或过滤元素,如上面相同的 post 链接中所述,但这似乎效率低下,因为它仍然需要处理元素,以便 evict/filter 它们。这将需要保留先前状态的列表并比较传入的元素。
理想情况下,Flink 知道只处理包含更改的元素。是否有有效的解决方案来执行此与数据流的连接并仅处理修改后的元素?
窗口联接在设计时并未考虑到这种情况。为了有效地处理这个问题,我认为您需要在 API 堆栈中下降一个级别并使用 KeyedCoProcessFunctions,或者上升一个级别并使用 Table API.
Flink 转换(合并)流中有元素正在重新处理,即使它们没有被修改。
假设我们有 3 个元素:1、2 和 3。当插入它们时,会发生这种情况:
- 当插入第一个元素1时,输出为:1
- 当插入第二个元素2时,输出为:1 -> 2(1被重新处理输出)
- 第三个元素插入:1 -> 2 -> 3(1和2重新处理)
在最后一次插入中,1 或 2 没有任何变化,因此没有理由重新处理它们。
重新处理规则:
- 仅重新处理同一出版商的图书。这意味着当插入出版商 2 的书籍时,只有出版商 2 的书籍被重新处理。我们的目标是不重新处理任何内容,因为它们不受现有新书的影响。
- 修改出版商时,只会重新处理该出版商的图书。 (没关系)
join后正在使用全局window,如下图:
bookStream
.join(publisherStream)
.where(book -> book.publisherId)
.equalTo(publisher -> publisher.id)
.window(GlobalWindows.create())
.trigger(new ForeverTrigger<>())
.apply(new JoinFunction<Book, Publisher, Book_Publisher>() {
@Override
public Book_Publisher join(Book book, Publisher publisher) throws Exception {
return new Book_Publisher(book, publisher);
}
})
ForeverTrigger 实现:
public class ForeverTrigger<T, E extends Window> extends Trigger<T, E> {
@Override
public TriggerResult onElement(T element, long timestamp, E window, TriggerContext ctx) throws Exception {
return TriggerResult.FIRE;
}
@Override
public TriggerResult onProcessingTime(long time, E window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onEventTime(long time, E window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public void clear(E window, TriggerContext ctx) throws Exception {}
}
对于这个用例,需要一直存储所有元素,因为如果一本书更新了,我们需要有相应的出版商加入,反之亦然。因此从 bookStream
或 publisherStream
中删除元素不是一个选项。
一种解决方案是使用 TableAPI,如此处所述:
另一种解决方案是驱逐或过滤元素,如上面相同的 post 链接中所述,但这似乎效率低下,因为它仍然需要处理元素,以便 evict/filter 它们。这将需要保留先前状态的列表并比较传入的元素。
理想情况下,Flink 知道只处理包含更改的元素。是否有有效的解决方案来执行此与数据流的连接并仅处理修改后的元素?
窗口联接在设计时并未考虑到这种情况。为了有效地处理这个问题,我认为您需要在 API 堆栈中下降一个级别并使用 KeyedCoProcessFunctions,或者上升一个级别并使用 Table API.