RXJava顺序执行可观察
RXJava Sequentially execute observable
我有多个 return 和 Observable<String>
的功能。每个函数在文件系统上执行命令。我需要一个接一个地执行每个函数,并在 observable 中获取函数的输出。最后我想要一个 Observable<String>
包含函数调用顺序的所有函数的输出
每个函数都按预期工作,但我需要正确合并输出。
我试过 Observable.concatArray(func1, func2, ... ) 像这样:
return Observable.concatArray(
func1(),
func2(),
func3(),
func4()
);
但这只是保留了可观察事件的顺序。不是功能的顺序。我的意思是如果 func1 发出事件 A 和 A' et func2 发出 B 和 B',我将有 A->A'->B->B'。但是 func2 将在 func1 之后立即启动。这导致我的问题是 func1 需要在 func2 开始之前完成。
第一个函数通过maven在文件系统上生成目录。因此,一项持续时间长的任务。第二,在这个目录中写入一个文件。但是 concatArray 在第一个之后立即启动第二个。并且命令失败,因为此时目录不存在。
有没有办法避免像这样丑陋的事情:
Subject<String> result = PublishSubject.create();
Observable<String> func1Obs = funct1();
Observable<String> func2Obs = funct2();
func1Obs.subscribe(output -> result.onNext(output));
func1Obs.onDoComplete(() -> {
func2Obs.subscribe(output -> result.onNext(output);
}
return result;
作为Progman的建议,错误不在于concatArray,这是要使用的方法。问题是,在我的函数列表中,我使用了这种代码:
public Observable<String> func1() {
Subject<String> result = PublishSubject.create();
String output = dosomething()
result.onNext(output);
}
这里的问题是在创建可观察对象时会立即调用函数 doSomething()。
如果您需要 onNext、onComplete 等,解决方案是使用 Observable.create()
...:
public Observable<String> func1() {
// See how we wrap our instruction inside create method
return Observable.create( result -> {
String output = dosomething()
result.onNext(output);
});
}
或Observable.defer()
,如果您只需要等待订阅:
public Observable<String> func1() {
// See how we wrap our instruction inside create method
return Observable.defer( () -> dosomething());
}
之后您可以拨打:
return Observable.concatArray(
func1(),
func2(),
func3(),
func4()
);
我有多个 return 和 Observable<String>
的功能。每个函数在文件系统上执行命令。我需要一个接一个地执行每个函数,并在 observable 中获取函数的输出。最后我想要一个 Observable<String>
包含函数调用顺序的所有函数的输出
每个函数都按预期工作,但我需要正确合并输出。
我试过 Observable.concatArray(func1, func2, ... ) 像这样:
return Observable.concatArray(
func1(),
func2(),
func3(),
func4()
);
但这只是保留了可观察事件的顺序。不是功能的顺序。我的意思是如果 func1 发出事件 A 和 A' et func2 发出 B 和 B',我将有 A->A'->B->B'。但是 func2 将在 func1 之后立即启动。这导致我的问题是 func1 需要在 func2 开始之前完成。
第一个函数通过maven在文件系统上生成目录。因此,一项持续时间长的任务。第二,在这个目录中写入一个文件。但是 concatArray 在第一个之后立即启动第二个。并且命令失败,因为此时目录不存在。
有没有办法避免像这样丑陋的事情:
Subject<String> result = PublishSubject.create();
Observable<String> func1Obs = funct1();
Observable<String> func2Obs = funct2();
func1Obs.subscribe(output -> result.onNext(output));
func1Obs.onDoComplete(() -> {
func2Obs.subscribe(output -> result.onNext(output);
}
return result;
作为Progman的建议,错误不在于concatArray,这是要使用的方法。问题是,在我的函数列表中,我使用了这种代码:
public Observable<String> func1() {
Subject<String> result = PublishSubject.create();
String output = dosomething()
result.onNext(output);
}
这里的问题是在创建可观察对象时会立即调用函数 doSomething()。
如果您需要 onNext、onComplete 等,解决方案是使用 Observable.create()
...:
public Observable<String> func1() {
// See how we wrap our instruction inside create method
return Observable.create( result -> {
String output = dosomething()
result.onNext(output);
});
}
或Observable.defer()
,如果您只需要等待订阅:
public Observable<String> func1() {
// See how we wrap our instruction inside create method
return Observable.defer( () -> dosomething());
}
之后您可以拨打:
return Observable.concatArray(
func1(),
func2(),
func3(),
func4()
);