运行 个线程时的异常处理

Exception handling while running threads

我有一个应用程序,它有一个 kafka 消费者并根据它收到的数据更新弹性搜索。

我的问题是,每当 ES 出现故障时,kafka 消费者会完全停止并且不会重新启动。

我相信这是因为我的 ES 代码是这样 运行:

public CompletionStage<SearchResponse> executeSearch(SearchRequest searchRequest) {
        CompletableFuture<SearchResponse> f = new CompletableFuture<>();

        client.searchAsync(searchRequest, RequestOptions.DEFAULT, new ActionListener<SearchResponse>() {
            @Override
            public void onResponse(SearchResponse searchResponse) {
                f.complete(searchResponse);
            }

            @Override
            public void onFailure(Exception e) {

                throw new Exception();  // I am guessing because of this
            }
        });
        return f;
} 

如果我将 onFailure 方法更改为:

            public void onFailure(Exception e) {

                f.complete(null);
            }

它工作得很好,但我不明白为什么抛出异常会导致这种情况。

如有任何帮助,我们将不胜感激。

对于那些需要解决方案的人,我将我的代码更改为以下以使其在异常情况下工作:

public void onFailure(Exception e) {
    f.completeExceptionally(new Exception());
}

也可能相关的是 exceptionally 方法和 CompletableFuture

中的 handle 方法

CompletableFuture.exceptionally((ex)-> ) - https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletableFuture.html#exceptionally-java.util.function.Function-

https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletableFuture.html#handle-java.util.function.BiFunction-