RxJava- 在 Observable 链中执行 peek() 或 void 操作?

RxJava- performing a peek() or void operation within an Observable chain?

Java 8 lambda 流有一个 peek() 运算符,它允许您对每个项目执行 void 操作。这通常用于调试,但它也是一种在不映射到某些东西的情况下欺骗和启动无效操作的好方法。

在 RxJava 中是否有与此等效的内容?也许我没有遵循良好的做法或没有足够的反应性思考......但是在操作之前和之后创建状态标签真的很方便吗?如果不支持peek(),有没有更好的模式可以遵循?

Observable<Item> Item= ...;

Label statusLabel = new Label();
Label resultLabel = new Label();

Observable<CalculatedItem> calculatedItem = calculated.subscribeOn(Schedulers.computation())
.peek(c -> statusLabel.setText("Working.."))
.map(c -> performExpensiveCalculation(c))
.peek(r -> statusLabel.setText(""));

calculatedItem.subscribe(c -> resultLabel.setText(c));

有一个方法 doOnNext(Action1<Item> action) 将为流中的每个项目调用。

Documentation