如何在 Kafka Stream map() 中等待未来?

How to wait for future inside Kafka Stream map()?

我正在 Java 中实施 Spring 启动应用程序,使用 Spring Cloud Stream 和 Kafka Streams 活页夹。

我需要像这样在 KStream 映射方法中实现阻塞操作:

public Consumer<KStream<?, ?>> sink() {
    return input -> input
        .mapValues(value -> methodReturningCompletableFuture(value).get())
        .foreach((key, value) -> otherMethod(key, value));
}

completableFuture.get() 抛出异常(InterruptedException、ExecutionException)

如何处理这些异常,使链式方法不被执行并且Kafka 消息不被确认?我不能承受消息丢失,不能将其发送到死信主题。

map()里面有没有更好的阻塞方式?

您可以尝试Kafka Streams 中的分支功能来控制链式方法的执行。例如,这里有一个 pseudo-code 你可以试试。 您可以以此为起点并根据您的特定用例进行调整。

final Map<String, ? extends KStream<?, String>> branches = 
input.split()
     .branch(k, v) -> {
        try {
          methodReturningCompletableFuture(value).get();
          return true;
        }
        catch (Exception e) {
          return false;
        }
      }, Branched.as("good-records"))
      .defaultBranch();

final KStream<?, String> kStream = branches.get("good-records");

 kStream.foreach((key, value) -> otherMethod(key, value));

这里的想法是,您只会将没有抛出异常的记录发送到指定分支 good-records,其他所有内容都会进入我们在此 [=25= 中忽略的默认分支].然后,您仅为那些“好”记录调用其他链接方法(如此 foreach 调用所示)。

这并没有解决抛出异常后不确认消息的问题。这似乎有点挑战。但是,我对那个用例很好奇。当异常发生并且你处理它时,你为什么不想确认消息?如果不使用 DLT,要求似乎有点严格。这里理想的解决方案是您可能想要引入一些重试,一旦重试耗尽,将记录发送到 DLT,这使得 Kafka Streams 消费者确认消息。然后应用程序移动到下一个偏移量。

调用 methodReturningCompletableFuture(value).get() 只是等待,直到达到默认或配置的超时,假设 methodReturningCompletableFuture() returns 一个 Future 对象。因此,在 KStream 映射操作中等待已经是一个很好的方法。我认为没有必要让它再等下去。