如何处理 Kafka KStream 意外状态?
How do I handle Kafka KStream unexpected state?
我有一个 Kafka 流,设置如下:
- 关于主题 A 的消息进入状态 "created"
- 调用远程 REST 端点以通知已收到消息
- 然后转换消息(状态设置为"notified")并发送
退出主题 A
我知道如何为快乐路径设置它(REST 端点 returns HTTP 2xx)。我的问题是:如果我的端点 returns 400,我该如何配置我的 kstream 来处理它而不发出消息?
这是我的 KSteam 配置:
final KStream<String, Message> stream = streamsBuilder.stream(topic,
Consumed.with(Serdes.String(), messageJsonSerde));
stream
.filter((key, value) -> MessageType.CASE_MATCHED.equals(
value.getEventType()))
.mapValues(notifierService::handleNotification) // calls REST endpoint
.to(topic, Produced.with(Serdes.String(), messageJsonSerde));
return stream;
我想最简单的方法是,在 mapValue
之后添加另一个 filter
以丢弃未成功处理的消息。
您可以更改第一个 mapValue
以将此信息添加到记录中,以便 filter
可以访问它。此外,您可以在第二个 filter
之后执行第二个 mapValue
以删除此附加 success
标志,以将未修改的消息写回 Kafka。
当然,这不会重新处理任何失败的消息,而只会从输出中过滤掉失败的消息。
我有一个 Kafka 流,设置如下:
- 关于主题 A 的消息进入状态 "created"
- 调用远程 REST 端点以通知已收到消息
- 然后转换消息(状态设置为"notified")并发送 退出主题 A
我知道如何为快乐路径设置它(REST 端点 returns HTTP 2xx)。我的问题是:如果我的端点 returns 400,我该如何配置我的 kstream 来处理它而不发出消息?
这是我的 KSteam 配置:
final KStream<String, Message> stream = streamsBuilder.stream(topic,
Consumed.with(Serdes.String(), messageJsonSerde));
stream
.filter((key, value) -> MessageType.CASE_MATCHED.equals(
value.getEventType()))
.mapValues(notifierService::handleNotification) // calls REST endpoint
.to(topic, Produced.with(Serdes.String(), messageJsonSerde));
return stream;
我想最简单的方法是,在 mapValue
之后添加另一个 filter
以丢弃未成功处理的消息。
您可以更改第一个 mapValue
以将此信息添加到记录中,以便 filter
可以访问它。此外,您可以在第二个 filter
之后执行第二个 mapValue
以删除此附加 success
标志,以将未修改的消息写回 Kafka。
当然,这不会重新处理任何失败的消息,而只会从输出中过滤掉失败的消息。