如何为Java中的CompletableFuture执行资源清理?
How to perform resource cleanup for CompletableFuture in Java?
我在 CompletableFuture
中有一段代码,如果有异常则执行重试,否则完成任务。我已将资源传递给 Supplier
和 Consumer
以执行任务,并希望在所有任务完成后关闭这些资源(重试 3 次后 success/exception
)。
这是一段代码:
Supplier mySupplier = new MySupplier(localContext);
CompletableFuture<String> future = CompletableFuture.supplyAsync(mySupplier);
for(int j = 0; j < (retryCount - 1); j++) {
LOGGER.debug("MySupplier accept() Retry count: "+j);
future = future.handleAsync((value, throwable) -> throwable == null? CompletableFuture.completedFuture(value): CompletableFuture.supplyAsync(mySupplier)).thenComposeAsync(Function.identity());
}
我打算将它放在供应商的 finally 块下,但如果发生第一个异常,资源将被关闭,我需要它们进行接下来的两次重试。
1) 如何让它发挥作用?
2)还有没有办法只在异常情况下打印重试次数?
由于您似乎并不关心中间结果,最简单的解决方案是将您的 Supplier
简单地包装在另一个处理重试的结果中:
class SupplierRetrier<T> implements Supplier<T> {
private static final Logger LOGGER = LoggerFactory.getLogger(SupplierRetrier.class);
final Supplier<T> wrappee;
final int maxRetries;
SupplierRetrier(Supplier<T> wrappee, int maxRetries) {
Objects.requireNonNull(wrappee);
if (maxRetries <= 0) {
throw new IllegalArgumentException("maxRetries must be more than 0: " + maxRetries);
}
this.wrappee = wrappee;
this.maxRetries = maxRetries;
}
@Override
public T get() {
RuntimeException lastException = null;
for (int i = 0; i < maxRetries; i++) {
try {
LOGGER.info("MySupplier accept() Retry count: "+i);
return wrappee.get();
} catch (RuntimeException e) {
lastException = e;
}
}
throw lastException;
}
}
然后您可以简单地使用它:
CompletableFuture<String> future = CompletableFuture.supplyAsync(
new SupplierRetrier<>(mySupplier, retryCount));
为了清理上下文,只需对生成的未来添加一个 whenComplete()
调用。无论未来的结果如何,这将被执行。
future.whenComplete((r, e) -> {
try {
localContext.close();
} catch (Exception e2) {
throw new RuntimeException("Failed to close context", e2);
}
});
1) 对于资源清理,使用 whenComplete
或 whenCompleteAsync
2) 对于重试计数,使用长度 1
或 AtomicInteger
的 int[]
。 (无论是否抛出 Exception
,此值都可用)
int[] retryCounter = { 0 };
// AtomicInteger retryCounter = new AtomicInteger();
for (int i = 0; i < noOfRetries; i++)
{
CompletableFuture<CompletableFuture<String>> handleAsync = cf.handleAsync((result, throwable) ->
{
if (throwable == null)
return CompletableFuture.completedFuture(result);
retryCounter[0]++;
// retryCounter.incrementAndGet();
return CompletableFuture.supplyAsync(supplier);
});
cf = handleAsync.thenCompose(Function.identity());
}
cf = cf.whenCompleteAsync((result, throwable) ->
{
System.out.println("Clean up");
System.out.println("Retry count: " + retryCounter[0]);
// System.out.println("Retry count: " + retryCounter.get());
});
System.out.println("Wating for result...");
System.out.println("Result: " + cf.get());
我在 CompletableFuture
中有一段代码,如果有异常则执行重试,否则完成任务。我已将资源传递给 Supplier
和 Consumer
以执行任务,并希望在所有任务完成后关闭这些资源(重试 3 次后 success/exception
)。
这是一段代码:
Supplier mySupplier = new MySupplier(localContext);
CompletableFuture<String> future = CompletableFuture.supplyAsync(mySupplier);
for(int j = 0; j < (retryCount - 1); j++) {
LOGGER.debug("MySupplier accept() Retry count: "+j);
future = future.handleAsync((value, throwable) -> throwable == null? CompletableFuture.completedFuture(value): CompletableFuture.supplyAsync(mySupplier)).thenComposeAsync(Function.identity());
}
我打算将它放在供应商的 finally 块下,但如果发生第一个异常,资源将被关闭,我需要它们进行接下来的两次重试。
1) 如何让它发挥作用?
2)还有没有办法只在异常情况下打印重试次数?
由于您似乎并不关心中间结果,最简单的解决方案是将您的 Supplier
简单地包装在另一个处理重试的结果中:
class SupplierRetrier<T> implements Supplier<T> {
private static final Logger LOGGER = LoggerFactory.getLogger(SupplierRetrier.class);
final Supplier<T> wrappee;
final int maxRetries;
SupplierRetrier(Supplier<T> wrappee, int maxRetries) {
Objects.requireNonNull(wrappee);
if (maxRetries <= 0) {
throw new IllegalArgumentException("maxRetries must be more than 0: " + maxRetries);
}
this.wrappee = wrappee;
this.maxRetries = maxRetries;
}
@Override
public T get() {
RuntimeException lastException = null;
for (int i = 0; i < maxRetries; i++) {
try {
LOGGER.info("MySupplier accept() Retry count: "+i);
return wrappee.get();
} catch (RuntimeException e) {
lastException = e;
}
}
throw lastException;
}
}
然后您可以简单地使用它:
CompletableFuture<String> future = CompletableFuture.supplyAsync(
new SupplierRetrier<>(mySupplier, retryCount));
为了清理上下文,只需对生成的未来添加一个 whenComplete()
调用。无论未来的结果如何,这将被执行。
future.whenComplete((r, e) -> {
try {
localContext.close();
} catch (Exception e2) {
throw new RuntimeException("Failed to close context", e2);
}
});
1) 对于资源清理,使用 whenComplete
或 whenCompleteAsync
2) 对于重试计数,使用长度 1
或 AtomicInteger
的 int[]
。 (无论是否抛出 Exception
,此值都可用)
int[] retryCounter = { 0 };
// AtomicInteger retryCounter = new AtomicInteger();
for (int i = 0; i < noOfRetries; i++)
{
CompletableFuture<CompletableFuture<String>> handleAsync = cf.handleAsync((result, throwable) ->
{
if (throwable == null)
return CompletableFuture.completedFuture(result);
retryCounter[0]++;
// retryCounter.incrementAndGet();
return CompletableFuture.supplyAsync(supplier);
});
cf = handleAsync.thenCompose(Function.identity());
}
cf = cf.whenCompleteAsync((result, throwable) ->
{
System.out.println("Clean up");
System.out.println("Retry count: " + retryCounter[0]);
// System.out.println("Retry count: " + retryCounter.get());
});
System.out.println("Wating for result...");
System.out.println("Result: " + cf.get());