使用 RxJava 遍历 Vertx JsonArray
Traversing a Vertx JsonArray using RxJava
我有一个消耗传入消息的 Verticle。每条消息都是一个顶点JsonObject
,其中包含一个顶点JsonArray
。我想为这个数组中的每个元素执行逻辑。逻辑本身包含在一个单独的 Verticle 中。第二个verticle使用rxVertx
。它定义了几个消费者,每个消费者委托给单独的方法,所有这些方法 return 一个 Observable
.
我的问题是:如何:
- 遍历
JsonArray
中的每个元素
- 将每个元素传递给使用
Observables
的消费者。
在第一个verticle中,尝试了以下方法:
EventBus eb = rxVertx.eventBus();
JsonArray array= incomingMessage.getJsonArray(KEY);
List<Object> list = array.getList();
Observable<Object> observable = Observable.fromArray(list);
observable.flatMapSingle(s -> {
eb.rxSend(SECOND_VERTICLE_ADDRESS, s);
}).subscribe();
对 flatMapSingle
的调用无法编译,因为:
The method flatMapSingle(Function<? super Object,? extends SingleSource<? extends R>>) in the type Observable<Object> is not applicable for the arguments ((<no type> s) -> {})
正确的做法是什么?非常感谢
如果用代码块定义 flatMapSingle
函数参数,则必须使用 return
关键字:
observable.flatMapSingle(s -> {
return eb.rxSend(SECOND_VERTICLE_ADDRESS, s);
}).subscribe(reply -> {
// Handle each reply
});
请注意,flatMapSingle
不保证回复与传入消息的顺序相同。如果您需要该保证,请使用 concatMapSingle
.
我有一个消耗传入消息的 Verticle。每条消息都是一个顶点JsonObject
,其中包含一个顶点JsonArray
。我想为这个数组中的每个元素执行逻辑。逻辑本身包含在一个单独的 Verticle 中。第二个verticle使用rxVertx
。它定义了几个消费者,每个消费者委托给单独的方法,所有这些方法 return 一个 Observable
.
我的问题是:如何:
- 遍历
JsonArray
中的每个元素
- 将每个元素传递给使用
Observables
的消费者。
在第一个verticle中,尝试了以下方法:
EventBus eb = rxVertx.eventBus();
JsonArray array= incomingMessage.getJsonArray(KEY);
List<Object> list = array.getList();
Observable<Object> observable = Observable.fromArray(list);
observable.flatMapSingle(s -> {
eb.rxSend(SECOND_VERTICLE_ADDRESS, s);
}).subscribe();
对 flatMapSingle
的调用无法编译,因为:
The method flatMapSingle(Function<? super Object,? extends SingleSource<? extends R>>) in the type Observable<Object> is not applicable for the arguments ((<no type> s) -> {})
正确的做法是什么?非常感谢
如果用代码块定义 flatMapSingle
函数参数,则必须使用 return
关键字:
observable.flatMapSingle(s -> {
return eb.rxSend(SECOND_VERTICLE_ADDRESS, s);
}).subscribe(reply -> {
// Handle each reply
});
请注意,flatMapSingle
不保证回复与传入消息的顺序相同。如果您需要该保证,请使用 concatMapSingle
.