Kafka Streams 检测丢失的记录

Kafka Streams detecting missing records

我正在通过 Kafka Streams 2.10 构建一个流媒体应用程序,但我遇到了一个概念性问题。

The producer1 sends (Key -> Value): Session1 -> RUNNING

The producer2 sends (Key -> Value): Sessionabc -> RUNNING

The producer1 sends (Key -> Value): Session1 -> DONE

现在我想检测死会话。我正在尝试使用 SessionWindow 但因为 Kafka 逐条计算记录,所以我无法一次全部计算。

这是我的片段:

builder
    .stream("topic", Consumed.with(serdeKeySessionEvent, serdeValueSessionEvent))
    .groupByKey(Grouped.with(serdeKeySessionEvent, serdeValueSessionEvent))
    .windowedBy(SessionWindows.with(SESSION_DURATION))
    .reduce(new SessionReducer())
    .toStream((windowed, value) -> windowed.key())
    .filter((k,v)-> Objects.nonNull(v) && v.getStatus() == Status.RUNNING)
    .peek((a,b)->System.out.println("This Value is missing: \n   "+a.toString()+b.toString()));`

注意:reducer 只是确保当我们看到 DONE 时,无论我们在同一会话中有哪个其他元素,它总是会完成。 有什么想法吗?

使用处理器 API 只需多一点代码就可以轻松完成。 DSL 可以与处理器 API.

混合使用

处理过程如下所示。

  1. 构建状态存储并使用 StreamsBuilder::addStateStore
  2. 添加它
  3. 创建 KStream 并使用 Transformer 调用 KStream::transform 函数,完成全部工作
  4. 如果会话为 DEADDONE
  5. ,转换结果将是带有信息的消息
  6. 使用 Transformer,您可以实现每条消息的处理方式。对于每条消息,您必须更新 keyValue Store,其中 key 是会话 ID。您必须保存有关会话的最后一条消息的时间戳
  7. 然后在 Punctuator(定期调用)中,检查哪个会话超时,并使用状态为 (DONE, DEAD)
  8. ProcessorContext::forward 传递信息

完整的代码是如何做到的,你可以找到here