使用 RxJava 遍历 Vertx JsonArray

Traversing a Vertx JsonArray using RxJava

我有一个消耗传入消息的 Verticle。每条消息都是一个顶点JsonObject,其中包含一个顶点JsonArray。我想为这个数组中的每个元素执行逻辑。逻辑本身包含在一个单独的 Verticle 中。第二个verticle使用rxVertx。它定义了几个消费者,每个消费者委托给单独的方法,所有这些方法 return 一个 Observable.

我的问题是:如何:

  1. 遍历JsonArray
  2. 中的每个元素
  3. 将每个元素传递给使用 Observables 的消费者。

在第一个verticle中,尝试了以下方法:

EventBus eb = rxVertx.eventBus();
JsonArray array= incomingMessage.getJsonArray(KEY);
List<Object> list  = array.getList();
Observable<Object> observable = Observable.fromArray(list);

observable.flatMapSingle(s -> {
      eb.rxSend(SECOND_VERTICLE_ADDRESS, s);
   }).subscribe();

flatMapSingle 的调用无法编译,因为:

The method flatMapSingle(Function<? super Object,? extends SingleSource<? extends R>>) in the type Observable<Object> is not applicable for the arguments ((<no type> s) -> {})

正确的做法是什么?非常感谢

如果用代码块定义 flatMapSingle 函数参数,则必须使用 return 关键字:

observable.flatMapSingle(s -> {
  return eb.rxSend(SECOND_VERTICLE_ADDRESS, s);
}).subscribe(reply -> {
  // Handle each reply
});

请注意,flatMapSingle 不保证回复与传入消息的顺序相同。如果您需要该保证,请使用 concatMapSingle.