Java 中 vector/stream 数据的异步、可组合 return 值 9
Asynchronous, composable return value for vector/stream data in Java 9
假设我有一个 API,它会根据某些查询条件查找或构建一个小部件:
Widget getMatchingWidget(WidgetCriteria c) throws Throwable
(同步)客户端代码如下所示:
try {
Widget w = getMatchingWidget(criteria);
processWidget(w);
} catch (Throwable t) {
handleError(t);
}
现在说查找或构建小部件的成本高得无法预测,我不希望客户在等待时阻塞。所以我将其更改为:
CompletableFuture<Widget> getMatchingWidget(WidgetCriteria c)
客户可以这样写:
CompletableFuture<Widget> f = getMatchingWidget(criteria);
f.thenAccept(this::processWidget)
f.exceptionally(t -> { handleError(t); return null; })
或:
getMatchingWidget(criteria).whenComplete((t, w) -> {
if (t != null) { handleError(t); }
else { processWidget(t); }
});
现在,假设同步 API 可以 return 0 到 n 个小部件:
Stream<Widget> getMatchingWidgets(WidgetCriteria c)
天真地,我可以写:
CompletableFuture<Stream<Widget>> getMatchingWidgets(WidgetCriteria c)
然而,这实际上并没有使代码成为非阻塞的,它只是推动阻塞——要么 Future
阻塞直到所有 Widgets
可用,或者代码遍历等待每个 Widget
的 Stream
个块。我想要的是能让我在每个小部件到达时对其进行处理的东西,例如:
void forEachMatchingWidget(WidgetCriteria c, Consumer<Widget> widgetProcessor)
但这不提供错误处理,即使我添加了一个额外的 Consumer<Throwable> errorHandler
,它也不允许我,例如,将我的小部件检索与其他查询组合起来,或转换结果。
所以我正在寻找一些可组合的东西,它结合了 Stream
的特征(可迭代性、可转换性)和 CompletableFuture
的特征(异步结果和错误处理)。 (而且,当我们这样做时,背压可能会很好。)
这是java.util.concurrent.Flow.Publisher? An io.reactivex.Observable吗?更复杂的东西?更简单的东西?
您的用例很自然地属于 RxJava 所处理的世界。如果我们有一个可观察的:
Observable<Widget> getMatchingWidgets(wc);
根据条件生成零个或多个小部件,然后您可以使用以下方法处理每个小部件:
getMatchingWidgets(wc)
.subscribeOn( backgroundScheduler )
.subscribe( w -> processWidget(w),
error -> handleError(error) );
可观察链将 运行 在 backgroundScheduler
上,它通常是线程池执行程序服务的包装器。如果你需要对你的UI中的每个widget进行最后的处理,你可以在处理之前使用observeOn()
运算符切换到UI调度器:
getMatchingWidgets(wc)
.subscribeOn( backgroundScheduler )
.observeOn( uiScheduler )
.subscribe( w -> processWidget(w),
error -> handleError(error) );
对我来说,RxJava 方法的优雅之处在于它以流畅的方式处理了管道管理的许多具体细节。看看那个观察者链,你就知道发生了什么,发生在哪里。
假设我有一个 API,它会根据某些查询条件查找或构建一个小部件:
Widget getMatchingWidget(WidgetCriteria c) throws Throwable
(同步)客户端代码如下所示:
try {
Widget w = getMatchingWidget(criteria);
processWidget(w);
} catch (Throwable t) {
handleError(t);
}
现在说查找或构建小部件的成本高得无法预测,我不希望客户在等待时阻塞。所以我将其更改为:
CompletableFuture<Widget> getMatchingWidget(WidgetCriteria c)
客户可以这样写:
CompletableFuture<Widget> f = getMatchingWidget(criteria);
f.thenAccept(this::processWidget)
f.exceptionally(t -> { handleError(t); return null; })
或:
getMatchingWidget(criteria).whenComplete((t, w) -> {
if (t != null) { handleError(t); }
else { processWidget(t); }
});
现在,假设同步 API 可以 return 0 到 n 个小部件:
Stream<Widget> getMatchingWidgets(WidgetCriteria c)
天真地,我可以写:
CompletableFuture<Stream<Widget>> getMatchingWidgets(WidgetCriteria c)
然而,这实际上并没有使代码成为非阻塞的,它只是推动阻塞——要么 Future
阻塞直到所有 Widgets
可用,或者代码遍历等待每个 Widget
的 Stream
个块。我想要的是能让我在每个小部件到达时对其进行处理的东西,例如:
void forEachMatchingWidget(WidgetCriteria c, Consumer<Widget> widgetProcessor)
但这不提供错误处理,即使我添加了一个额外的 Consumer<Throwable> errorHandler
,它也不允许我,例如,将我的小部件检索与其他查询组合起来,或转换结果。
所以我正在寻找一些可组合的东西,它结合了 Stream
的特征(可迭代性、可转换性)和 CompletableFuture
的特征(异步结果和错误处理)。 (而且,当我们这样做时,背压可能会很好。)
这是java.util.concurrent.Flow.Publisher? An io.reactivex.Observable吗?更复杂的东西?更简单的东西?
您的用例很自然地属于 RxJava 所处理的世界。如果我们有一个可观察的:
Observable<Widget> getMatchingWidgets(wc);
根据条件生成零个或多个小部件,然后您可以使用以下方法处理每个小部件:
getMatchingWidgets(wc)
.subscribeOn( backgroundScheduler )
.subscribe( w -> processWidget(w),
error -> handleError(error) );
可观察链将 运行 在 backgroundScheduler
上,它通常是线程池执行程序服务的包装器。如果你需要对你的UI中的每个widget进行最后的处理,你可以在处理之前使用observeOn()
运算符切换到UI调度器:
getMatchingWidgets(wc)
.subscribeOn( backgroundScheduler )
.observeOn( uiScheduler )
.subscribe( w -> processWidget(w),
error -> handleError(error) );
对我来说,RxJava 方法的优雅之处在于它以流畅的方式处理了管道管理的许多具体细节。看看那个观察者链,你就知道发生了什么,发生在哪里。