Rx-Java,同步后调用并行异步

Rx-Java, calling parallel async after a sync

我有几个简单的 java 类 看起来像这样:

 public class Parent {
    public String id;
    public String name;
    public List<String> childIds;
 }

 private class Child {
    public String id;
    public String name;
 }

 private class Output {
    public String name;
    public List<String> childNames;
 }

我想发送 rx-java 一个包含 parent 个 ID 的列表。对于每个 parent Id,rx 需要获取 parent object,然后对于 parent object 中的每个 child id,获取 child object秒。最后,将输出object和return填满即可。

我遇到问题的部分是获取 child object 的调用需要并行完成。这是我到目前为止所拥有的。我使用 collect 函数将 children 添加到输出中(children 是使用 Async.from 获得的)。对于我要尝试做的事情,这似乎很复杂。这是最好的方法,还是有更好的方法?

  private class ToOutput implements Func1<Parent, Observable<Output>> {
    private Observable<Child> getChildren(Parent p) {
      Observable<String> childIds = Observable.from(p.childIds);
      return childIds.flatMap(new Func1<String, Observable<Child>>() {

        @Override
        public Observable<Child> call(String sku) {
          return Async.fromCallable(new ToChild(sku), Schedulers.io());
        }
      });
    }

    @Override
    public Observable<Output> call(final Parent p) {

      Func0<Output> initialState = new Func0<Output>() {
        @Override
        public Output call() {
          final Output output = new Output();
          output.name = p.name;
          output.childNames = new ArrayList<String>();
          return output;
        }
      };

      Observable<Child> oChildren = getChildren(p);
      Observable<Output> output = oChildren.collect(initialState, new Action2<Output, Child>() {
        @Override
        public void call(Output output, Child child) {
          output.childNames.add(child.name);
        }
      });
      return output;
    }
  }

  @Test
  public void test1() throws Exception {
    Observable<String> ids = Observable.just("1", "2", "3");
    Observable<Parent> parents = getParent(ids);
    Observable<Output> output = parents.flatMap(new ToOutput());
    ... do something with output;
  }

您可以定义一个方法

而不是使用 Async.fromCallable
Observable<Child> getChild(String childId)

这基本上是 ToChild.call 目前所做的。

然后在flatMap中,可以使用subscribeOn(Schedulers.io())得到并行度:

Observable<Child> getChildren(Parent p) {
    return Observable.from(p.childIds)
        .flatMap(new Func1<String, Observable<Child>>() {
            public Observable<Child> call(String childId) {
                return getChild(childId).subscribeOn(Schedulers.io());
            }
        });
}

并且在 ToOutput 中,您可以从 toList() 运算符中获益,而不是自己实现它:

class ToOutput implements Func1<Parent, Observable<Output>> {
    public Observable<Output> call(final Parent p) {
        return getChildren(p)
            .map(new Func1<Child, String>() {
                public String call(Child c) {
                    return c.name;
                }
            })
            .toList()
            .map(new Func1<List<String>, Output>() {
                public Output call(List<String> l) {
                    return new Output(p.name, l);
                }
            });
    }
}