如何在 Kafka Stream map() 中等待未来?
How to wait for future inside Kafka Stream map()?
我正在 Java 中实施 Spring 启动应用程序,使用 Spring Cloud Stream 和 Kafka Streams 活页夹。
我需要像这样在 KStream 映射方法中实现阻塞操作:
public Consumer<KStream<?, ?>> sink() {
return input -> input
.mapValues(value -> methodReturningCompletableFuture(value).get())
.foreach((key, value) -> otherMethod(key, value));
}
completableFuture.get()
抛出异常(InterruptedException、ExecutionException)
如何处理这些异常,使链式方法不被执行并且Kafka 消息不被确认?我不能承受消息丢失,不能将其发送到死信主题。
在map()
里面有没有更好的阻塞方式?
您可以尝试Kafka Streams 中的分支功能来控制链式方法的执行。例如,这里有一个 pseudo-code 你可以试试。
您可以以此为起点并根据您的特定用例进行调整。
final Map<String, ? extends KStream<?, String>> branches =
input.split()
.branch(k, v) -> {
try {
methodReturningCompletableFuture(value).get();
return true;
}
catch (Exception e) {
return false;
}
}, Branched.as("good-records"))
.defaultBranch();
final KStream<?, String> kStream = branches.get("good-records");
kStream.foreach((key, value) -> otherMethod(key, value));
这里的想法是,您只会将没有抛出异常的记录发送到指定分支 good-records
,其他所有内容都会进入我们在此 [=25= 中忽略的默认分支].然后,您仅为那些“好”记录调用其他链接方法(如此 foreach
调用所示)。
这并没有解决抛出异常后不确认消息的问题。这似乎有点挑战。但是,我对那个用例很好奇。当异常发生并且你处理它时,你为什么不想确认消息?如果不使用 DLT,要求似乎有点严格。这里理想的解决方案是您可能想要引入一些重试,一旦重试耗尽,将记录发送到 DLT,这使得 Kafka Streams 消费者确认消息。然后应用程序移动到下一个偏移量。
调用 methodReturningCompletableFuture(value).get()
只是等待,直到达到默认或配置的超时,假设 methodReturningCompletableFuture()
returns 一个 Future
对象。因此,在 KStream
映射操作中等待已经是一个很好的方法。我认为没有必要让它再等下去。
我正在 Java 中实施 Spring 启动应用程序,使用 Spring Cloud Stream 和 Kafka Streams 活页夹。
我需要像这样在 KStream 映射方法中实现阻塞操作:
public Consumer<KStream<?, ?>> sink() {
return input -> input
.mapValues(value -> methodReturningCompletableFuture(value).get())
.foreach((key, value) -> otherMethod(key, value));
}
completableFuture.get()
抛出异常(InterruptedException、ExecutionException)
如何处理这些异常,使链式方法不被执行并且Kafka 消息不被确认?我不能承受消息丢失,不能将其发送到死信主题。
在map()
里面有没有更好的阻塞方式?
您可以尝试Kafka Streams 中的分支功能来控制链式方法的执行。例如,这里有一个 pseudo-code 你可以试试。 您可以以此为起点并根据您的特定用例进行调整。
final Map<String, ? extends KStream<?, String>> branches =
input.split()
.branch(k, v) -> {
try {
methodReturningCompletableFuture(value).get();
return true;
}
catch (Exception e) {
return false;
}
}, Branched.as("good-records"))
.defaultBranch();
final KStream<?, String> kStream = branches.get("good-records");
kStream.foreach((key, value) -> otherMethod(key, value));
这里的想法是,您只会将没有抛出异常的记录发送到指定分支 good-records
,其他所有内容都会进入我们在此 [=25= 中忽略的默认分支].然后,您仅为那些“好”记录调用其他链接方法(如此 foreach
调用所示)。
这并没有解决抛出异常后不确认消息的问题。这似乎有点挑战。但是,我对那个用例很好奇。当异常发生并且你处理它时,你为什么不想确认消息?如果不使用 DLT,要求似乎有点严格。这里理想的解决方案是您可能想要引入一些重试,一旦重试耗尽,将记录发送到 DLT,这使得 Kafka Streams 消费者确认消息。然后应用程序移动到下一个偏移量。
调用 methodReturningCompletableFuture(value).get()
只是等待,直到达到默认或配置的超时,假设 methodReturningCompletableFuture()
returns 一个 Future
对象。因此,在 KStream
映射操作中等待已经是一个很好的方法。我认为没有必要让它再等下去。