Kafka Streams 检测丢失的记录
Kafka Streams detecting missing records
我正在通过 Kafka Streams 2.10 构建一个流媒体应用程序,但我遇到了一个概念性问题。
The producer1 sends (Key -> Value): Session1 -> RUNNING
The producer2 sends (Key -> Value): Sessionabc -> RUNNING
The producer1 sends (Key -> Value): Session1 -> DONE
现在我想检测死会话。我正在尝试使用 SessionWindow 但因为 Kafka 逐条计算记录,所以我无法一次全部计算。
这是我的片段:
builder
.stream("topic", Consumed.with(serdeKeySessionEvent, serdeValueSessionEvent))
.groupByKey(Grouped.with(serdeKeySessionEvent, serdeValueSessionEvent))
.windowedBy(SessionWindows.with(SESSION_DURATION))
.reduce(new SessionReducer())
.toStream((windowed, value) -> windowed.key())
.filter((k,v)-> Objects.nonNull(v) && v.getStatus() == Status.RUNNING)
.peek((a,b)->System.out.println("This Value is missing: \n "+a.toString()+b.toString()));`
注意:reducer 只是确保当我们看到 DONE 时,无论我们在同一会话中有哪个其他元素,它总是会完成。
有什么想法吗?
使用处理器 API 只需多一点代码就可以轻松完成。 DSL 可以与处理器 API.
混合使用
处理过程如下所示。
- 构建状态存储并使用
StreamsBuilder::addStateStore
添加它
- 创建 KStream 并使用 Transformer 调用
KStream::transform
函数,完成全部工作
- 如果会话为 DEAD 或 DONE
,转换结果将是带有信息的消息
- 使用 Transformer,您可以实现每条消息的处理方式。对于每条消息,您必须更新 keyValue Store,其中 key 是会话 ID。您必须保存有关会话的最后一条消息的时间戳
- 然后在 Punctuator(定期调用)中,检查哪个会话超时,并使用状态为 (DONE, DEAD)
的 ProcessorContext::forward
传递信息
完整的代码是如何做到的,你可以找到here
我正在通过 Kafka Streams 2.10 构建一个流媒体应用程序,但我遇到了一个概念性问题。
The producer1 sends (Key -> Value): Session1 -> RUNNING
The producer2 sends (Key -> Value): Sessionabc -> RUNNING
The producer1 sends (Key -> Value): Session1 -> DONE
现在我想检测死会话。我正在尝试使用 SessionWindow 但因为 Kafka 逐条计算记录,所以我无法一次全部计算。
这是我的片段:
builder
.stream("topic", Consumed.with(serdeKeySessionEvent, serdeValueSessionEvent))
.groupByKey(Grouped.with(serdeKeySessionEvent, serdeValueSessionEvent))
.windowedBy(SessionWindows.with(SESSION_DURATION))
.reduce(new SessionReducer())
.toStream((windowed, value) -> windowed.key())
.filter((k,v)-> Objects.nonNull(v) && v.getStatus() == Status.RUNNING)
.peek((a,b)->System.out.println("This Value is missing: \n "+a.toString()+b.toString()));`
注意:reducer 只是确保当我们看到 DONE 时,无论我们在同一会话中有哪个其他元素,它总是会完成。 有什么想法吗?
使用处理器 API 只需多一点代码就可以轻松完成。 DSL 可以与处理器 API.
混合使用处理过程如下所示。
- 构建状态存储并使用
StreamsBuilder::addStateStore
添加它
- 创建 KStream 并使用 Transformer 调用
KStream::transform
函数,完成全部工作 - 如果会话为 DEAD 或 DONE ,转换结果将是带有信息的消息
- 使用 Transformer,您可以实现每条消息的处理方式。对于每条消息,您必须更新 keyValue Store,其中 key 是会话 ID。您必须保存有关会话的最后一条消息的时间戳
- 然后在 Punctuator(定期调用)中,检查哪个会话超时,并使用状态为 (DONE, DEAD) 的
ProcessorContext::forward
传递信息
完整的代码是如何做到的,你可以找到here