我可以使用 KSQL 生成处理时间超时吗?

can I use KSQL to generate processing-time timeouts?

我正在尝试使用 KSQL 在一个时间限制内做任何我能做的处理,并在那个时间限制内得到结果。请参阅 "Processing Time Timers" 下的 Timely (and Stateful) Processing with Apache Beam,了解使用 Apache Beam 说明的相同想法。

鉴于:

  1. 具有唯一键的交易流;
  2. 在同一流中更新这些事务;和
  3. 下游处理器希望在特定超时(比如 20 秒)在事务出现在第一个流中之后接收更新的事务。

从概念上讲,我正在考虑创建第一个流的 KTable 来保存事务的最新状态,并使用 KSQL 通过使用 (create_time + timeout 查询 KTable 的键来创建输出流) < current_time。 (并将超时作为 "updates" 添加到第一个流,以便我可以从 KTable 中过滤掉它们)

我还没有在 KSQL 文档中找到执行此操作的方法,即使有内置的 current_time,我也不确定它是否会被评估,直到另一条记录出现流。

我如何在 KSQL 中执行此操作?我需要自定义 UDF 吗?如果在KSQL中做不到,我可以在KStreams中做吗?

=====

更新:看起来 KStreams 今天不支持这个 - Apache Flink 似乎是这个用例(以及许多其他用例)的方法。如果您知道绕过 KStreams 限制的巧妙方法,请告诉我!

查看 Kafka Streams Processor API 中的 punctuate() 功能,这可能是您正在寻找的。您可以将 punctuate() 与流时间(默认值:事件时间)以及处理时间(通过 PunctuationType.WALL_CLOCK_TIME)一起使用。在这里,您将根据需要实施 ProcessorTransformer,这将使用 punctuate() 实现超时功能。

有关详细信息,请参阅 https://kafka.apache.org/documentation/streams/developer-guide/processor-api.html

提示:您也可以在 Kafka Streams 的 DSL 中使用这样的 Processor/Transformer。这意味着您可以继续使用更方便的 DSL,如果您愿意,只需在基于 DSL 的代码中的正确位置插入 Processor/Transformer。