如何处理 Kafka KStream 意外状态?

How do I handle Kafka KStream unexpected state?

我有一个 Kafka 流,设置如下:

  1. 关于主题 A 的消息进入状态 "created"
  2. 调用远程 REST 端点以通知已收到消息
  3. 然后转换消息(状态设置为"notified")并发送 退出主题 A

我知道如何为快乐路径设置它(REST 端点 returns HTTP 2xx)。我的问题是:如果我的端点 returns 400,我该如何配置我的 kstream 来处理它而不发出消息?

这是我的 KSteam 配置:

    final KStream<String, Message> stream = streamsBuilder.stream(topic,
            Consumed.with(Serdes.String(), messageJsonSerde));
    stream
            .filter((key, value) -> MessageType.CASE_MATCHED.equals(
                    value.getEventType()))
            .mapValues(notifierService::handleNotification) // calls REST endpoint
            .to(topic, Produced.with(Serdes.String(), messageJsonSerde));
    return stream;

我想最简单的方法是,在 mapValue 之后添加另一个 filter 以丢弃未成功处理的消息。

您可以更改第一个 mapValue 以将此信息添加到记录中,以便 filter 可以访问它。此外,您可以在第二个 filter 之后执行第二个 mapValue 以删除此附加 success 标志,以将未修改的消息写回 Kafka。

当然,这不会重新处理任何失败的消息,而只会从输出中过滤掉失败的消息。