Rx-Java,同步后调用并行异步
Rx-Java, calling parallel async after a sync
我有几个简单的 java 类 看起来像这样:
public class Parent {
public String id;
public String name;
public List<String> childIds;
}
private class Child {
public String id;
public String name;
}
private class Output {
public String name;
public List<String> childNames;
}
我想发送 rx-java 一个包含 parent 个 ID 的列表。对于每个 parent Id,rx 需要获取 parent object,然后对于 parent object 中的每个 child id,获取 child object秒。最后,将输出object和return填满即可。
我遇到问题的部分是获取 child object 的调用需要并行完成。这是我到目前为止所拥有的。我使用 collect 函数将 children 添加到输出中(children 是使用 Async.from 获得的)。对于我要尝试做的事情,这似乎很复杂。这是最好的方法,还是有更好的方法?
private class ToOutput implements Func1<Parent, Observable<Output>> {
private Observable<Child> getChildren(Parent p) {
Observable<String> childIds = Observable.from(p.childIds);
return childIds.flatMap(new Func1<String, Observable<Child>>() {
@Override
public Observable<Child> call(String sku) {
return Async.fromCallable(new ToChild(sku), Schedulers.io());
}
});
}
@Override
public Observable<Output> call(final Parent p) {
Func0<Output> initialState = new Func0<Output>() {
@Override
public Output call() {
final Output output = new Output();
output.name = p.name;
output.childNames = new ArrayList<String>();
return output;
}
};
Observable<Child> oChildren = getChildren(p);
Observable<Output> output = oChildren.collect(initialState, new Action2<Output, Child>() {
@Override
public void call(Output output, Child child) {
output.childNames.add(child.name);
}
});
return output;
}
}
@Test
public void test1() throws Exception {
Observable<String> ids = Observable.just("1", "2", "3");
Observable<Parent> parents = getParent(ids);
Observable<Output> output = parents.flatMap(new ToOutput());
... do something with output;
}
您可以定义一个方法
而不是使用 Async.fromCallable
Observable<Child> getChild(String childId)
这基本上是 ToChild.call
目前所做的。
然后在flatMap
中,可以使用subscribeOn(Schedulers.io())
得到并行度:
Observable<Child> getChildren(Parent p) {
return Observable.from(p.childIds)
.flatMap(new Func1<String, Observable<Child>>() {
public Observable<Child> call(String childId) {
return getChild(childId).subscribeOn(Schedulers.io());
}
});
}
并且在 ToOutput
中,您可以从 toList()
运算符中获益,而不是自己实现它:
class ToOutput implements Func1<Parent, Observable<Output>> {
public Observable<Output> call(final Parent p) {
return getChildren(p)
.map(new Func1<Child, String>() {
public String call(Child c) {
return c.name;
}
})
.toList()
.map(new Func1<List<String>, Output>() {
public Output call(List<String> l) {
return new Output(p.name, l);
}
});
}
}
我有几个简单的 java 类 看起来像这样:
public class Parent {
public String id;
public String name;
public List<String> childIds;
}
private class Child {
public String id;
public String name;
}
private class Output {
public String name;
public List<String> childNames;
}
我想发送 rx-java 一个包含 parent 个 ID 的列表。对于每个 parent Id,rx 需要获取 parent object,然后对于 parent object 中的每个 child id,获取 child object秒。最后,将输出object和return填满即可。
我遇到问题的部分是获取 child object 的调用需要并行完成。这是我到目前为止所拥有的。我使用 collect 函数将 children 添加到输出中(children 是使用 Async.from 获得的)。对于我要尝试做的事情,这似乎很复杂。这是最好的方法,还是有更好的方法?
private class ToOutput implements Func1<Parent, Observable<Output>> {
private Observable<Child> getChildren(Parent p) {
Observable<String> childIds = Observable.from(p.childIds);
return childIds.flatMap(new Func1<String, Observable<Child>>() {
@Override
public Observable<Child> call(String sku) {
return Async.fromCallable(new ToChild(sku), Schedulers.io());
}
});
}
@Override
public Observable<Output> call(final Parent p) {
Func0<Output> initialState = new Func0<Output>() {
@Override
public Output call() {
final Output output = new Output();
output.name = p.name;
output.childNames = new ArrayList<String>();
return output;
}
};
Observable<Child> oChildren = getChildren(p);
Observable<Output> output = oChildren.collect(initialState, new Action2<Output, Child>() {
@Override
public void call(Output output, Child child) {
output.childNames.add(child.name);
}
});
return output;
}
}
@Test
public void test1() throws Exception {
Observable<String> ids = Observable.just("1", "2", "3");
Observable<Parent> parents = getParent(ids);
Observable<Output> output = parents.flatMap(new ToOutput());
... do something with output;
}
您可以定义一个方法
而不是使用Async.fromCallable
Observable<Child> getChild(String childId)
这基本上是 ToChild.call
目前所做的。
然后在flatMap
中,可以使用subscribeOn(Schedulers.io())
得到并行度:
Observable<Child> getChildren(Parent p) {
return Observable.from(p.childIds)
.flatMap(new Func1<String, Observable<Child>>() {
public Observable<Child> call(String childId) {
return getChild(childId).subscribeOn(Schedulers.io());
}
});
}
并且在 ToOutput
中,您可以从 toList()
运算符中获益,而不是自己实现它:
class ToOutput implements Func1<Parent, Observable<Output>> {
public Observable<Output> call(final Parent p) {
return getChildren(p)
.map(new Func1<Child, String>() {
public String call(Child c) {
return c.name;
}
})
.toList()
.map(new Func1<List<String>, Output>() {
public Output call(List<String> l) {
return new Output(p.name, l);
}
});
}
}