CompletableFuture.supplyAsync() 没有 Lambda
CompletableFuture.supplyAsync() without Lambda
我正在努力使用 Supplier<U>
等函数式风格并创建可测试的代码。
所以我有一个 InputStream
,它被分成异步处理的块,我想知道它们何时全部完成。为了编写可测试的代码,我将处理逻辑外包给它自己 Runnable
:
public class StreamProcessor {
public CompletableFuture<Void> process(InputStream in) {
List<CompletableFuture> futures = new ArrayList<>();
while (true) {
try (SizeLimitInputStream chunkStream = new SizeLimitInputStream(in, 100)) {
byte[] data = IOUtils.toByteArray(chunkStream);
CompletableFuture<Void> f = CompletableFuture.runAsync(createTask(data));
futures.add(f);
} catch (EOFException ex) {
// end of stream reached
break;
} catch (IOException ex) {
return CompletableFuture.failedFuture(ex);
}
}
return CompletableFuture.allOf(futures.toArray(CompletableFuture<?>[]::new));
}
ChunkTask createTask(byte[] data) {
return new ChunkTask(data);
}
public class ChunkTask implements Runnable {
final byte[] data;
ChunkTask(byte[] data) {
this.data = data;
}
@Override
public void run() {
try {
// do something
} catch (Exception ex) {
// checked exceptions must be wrapped
throw new RuntimeException(ex);
}
}
}
}
这很好用,但有两个问题:
- 处理代码不能return任何东西;毕竟是
Runnable
- 任何在
ChunkTask.run()
中捕获的已检查异常都必须包装到 RuntimeException
中。展开失败的组合 CompletableFuture
returns RuntimeException
需要再次展开 以达到原始原因 - 与 [=20 形成对比=].
所以我正在寻找一种使用 CompletableFuture.supplyAsync()
执行此操作的方法,但我不知道如何在没有 lambda 表达式(测试不佳)或 return 的情况下执行此操作 CompletableFuture.failedFuture()
来自处理逻辑。
我可以想到两种方法:
1.随着 supplyAsync
:
使用 CompletableFuture.supplyAsync
时,您需要供应商而不是可运行的:
public static class ChunkTask implements Supplier<Object> {
final byte[] data;
ChunkTask(byte[] data) {
this.data = data;
}
@Override
public Object get() {
Object result = ...;
// Do something or throw an exception
return result;
}
}
然后:
CompletableFuture
.supplyAsync( new ChunkTask( data ) )
.whenComplete( (result, throwable) -> ... );
如果 Supplier.get()
中发生异常,它将被传播,您可以在 CompletableFuture.whenComplete
、CompletableFuture.handle
或 CompletableFuture.exceptionally
.
中看到它
2。将 CompletableFuture
传递给线程
您可以将 CompletableFuture
传递给 ChunkTask
:
public class ChunkTask implements Runnable {
final byte[] data;
private final CompletableFuture<Object> future;
ChunkTask(byte[] data, CompletableFuture<Object> future) {
this.data = data;
this.future = future;
}
@Override
public void run() {
try {
Object result = null;
// do something
future.complete( result );
} catch (Throwable ex) {
future.completeExceptionally( ex );
}
}
}
那么逻辑就变成了:
while (true) {
CompletableFuture<Object> f = new CompletableFuture<>();
try (SizeLimitInputStream chunkStream = new SizeLimitInputStream(in, 100)) {
byte[] data = IOUtils.toByteArray(chunkStream);
startThread(new ChunkTask(data, f));
futures.add(f);
} catch (EOFException ex) {
// end of stream reached
break;
} catch (IOException ex) {
f.completeExceptionally( ex );
return f;
}
}
可能,第 2 条可以让您更灵活地管理异常。
我正在努力使用 Supplier<U>
等函数式风格并创建可测试的代码。
所以我有一个 InputStream
,它被分成异步处理的块,我想知道它们何时全部完成。为了编写可测试的代码,我将处理逻辑外包给它自己 Runnable
:
public class StreamProcessor {
public CompletableFuture<Void> process(InputStream in) {
List<CompletableFuture> futures = new ArrayList<>();
while (true) {
try (SizeLimitInputStream chunkStream = new SizeLimitInputStream(in, 100)) {
byte[] data = IOUtils.toByteArray(chunkStream);
CompletableFuture<Void> f = CompletableFuture.runAsync(createTask(data));
futures.add(f);
} catch (EOFException ex) {
// end of stream reached
break;
} catch (IOException ex) {
return CompletableFuture.failedFuture(ex);
}
}
return CompletableFuture.allOf(futures.toArray(CompletableFuture<?>[]::new));
}
ChunkTask createTask(byte[] data) {
return new ChunkTask(data);
}
public class ChunkTask implements Runnable {
final byte[] data;
ChunkTask(byte[] data) {
this.data = data;
}
@Override
public void run() {
try {
// do something
} catch (Exception ex) {
// checked exceptions must be wrapped
throw new RuntimeException(ex);
}
}
}
}
这很好用,但有两个问题:
- 处理代码不能return任何东西;毕竟是
Runnable
- 任何在
ChunkTask.run()
中捕获的已检查异常都必须包装到RuntimeException
中。展开失败的组合CompletableFuture
returnsRuntimeException
需要再次展开 以达到原始原因 - 与 [=20 形成对比=].
所以我正在寻找一种使用 CompletableFuture.supplyAsync()
执行此操作的方法,但我不知道如何在没有 lambda 表达式(测试不佳)或 return 的情况下执行此操作 CompletableFuture.failedFuture()
来自处理逻辑。
我可以想到两种方法:
1.随着 supplyAsync
:
使用 CompletableFuture.supplyAsync
时,您需要供应商而不是可运行的:
public static class ChunkTask implements Supplier<Object> {
final byte[] data;
ChunkTask(byte[] data) {
this.data = data;
}
@Override
public Object get() {
Object result = ...;
// Do something or throw an exception
return result;
}
}
然后:
CompletableFuture
.supplyAsync( new ChunkTask( data ) )
.whenComplete( (result, throwable) -> ... );
如果 Supplier.get()
中发生异常,它将被传播,您可以在 CompletableFuture.whenComplete
、CompletableFuture.handle
或 CompletableFuture.exceptionally
.
2。将 CompletableFuture
传递给线程
您可以将 CompletableFuture
传递给 ChunkTask
:
public class ChunkTask implements Runnable {
final byte[] data;
private final CompletableFuture<Object> future;
ChunkTask(byte[] data, CompletableFuture<Object> future) {
this.data = data;
this.future = future;
}
@Override
public void run() {
try {
Object result = null;
// do something
future.complete( result );
} catch (Throwable ex) {
future.completeExceptionally( ex );
}
}
}
那么逻辑就变成了:
while (true) {
CompletableFuture<Object> f = new CompletableFuture<>();
try (SizeLimitInputStream chunkStream = new SizeLimitInputStream(in, 100)) {
byte[] data = IOUtils.toByteArray(chunkStream);
startThread(new ChunkTask(data, f));
futures.add(f);
} catch (EOFException ex) {
// end of stream reached
break;
} catch (IOException ex) {
f.completeExceptionally( ex );
return f;
}
}
可能,第 2 条可以让您更灵活地管理异常。