使用 callbacks/listeners 链接 RxJava observable
Chaining RxJava observables with callbacks/listeners
我正在将 Retrofit 与 Observables 一起使用,并且想链接这些 observables。通常它与像 map()
或 flatMap()
这样的函数配合得很好,因为 api
returns 是一个完成任务的 Observable。但在这种情况下,我必须执行以下操作:
- getKey() 来自
api
- 在另一个库中使用返回的密钥
Foo
并等待回调被调用。
- 当回调returns时,将结果发送给
api
。
我希望这是一个单链调用,这样我只需订阅一次。我猜我可以使用 merge()
或 join()
或其他东西,但不确定处理回调的最佳方法是什么。
有没有办法让它变得更好?这是我目前所拥有的:
api.getKey().subscribe(new Action1<String>() {
@Override
public void call(String key) {
Foo foo = new Foo();
foo.setAwesomeCallback(new AwesomeCallback() {
@Override
public void onAwesomeReady(String awesome) {
api.sendAwesome(awesome)
.subscribe(new Action1<Void>() {
@Override
public void call(Void aVoid) {
handleAwesomeSent();
}
});
}
});
foo.makeAwesome();
}
});
您必须使用 PublishSubject 将基于回调的 API 转换为可观察对象。
尝试类似的方法(未测试):
api.getKey().flatMap(new Func1<String, Observable<String>>() {
@Override
public Observable<String> call(String key) {
Foo foo = new Foo();
PublishSubject<String> subject = PublishSubject.create();
foo.setAwesomeCallback(new AwesomeCallback() {
@Override
public void onAwesomeReady(String awesome) {
subject.onNext(awesome);
subject.onComplete();
}
});
foo.makeAwesome();
return subject;
}
}).flatMap(new Func1<String, Observable<String>>() {
@Override
public Observable<String> call(String awesome) {
return sendAwesome(awesome);
}
}).subscribe(new Action1<Void>() {
@Override
public void call(Void aVoid) {
handleAwesomeSent();
}
});
采用 clemp6r 的解决方案,这是另一个既不需要 Subjects
也不需要嵌套 Subscriptions
的解决方案:
api.getKey().flatMap(new Func1<String, Observable<String>>() {
@Override
public Observable<String> call(String key) {
return Observable.create(new Observable.OnSubscribe<String>(){
@Override
public void call(final Subscriber<? super String> subscriber) {
Foo foo = new Foo();
foo.setAwesomeCallback(new AwesomeCallback() {
@Override
public void onAwesomeReady(String awesome) {
if (! subscriber.isUnsubscribed()) {
subscriber.onNext(awesome);
subscriber.onComplete();
}
}
});
foo.makeAwesome();
}
});
}).flatMap(new Func1<String, Observable<String>>() {
@Override
public Observable<String> call(String awesome) {
return sendAwesome(awesome);
}
}).subscribe(new Action1<Void>() {
@Override
public void call(Void aVoid) {
handleAwesomeSent();
}
});
总的来说,我认为总是可以使用 Observable.create()
.
将任何基于回调的异步操作包装在 Observable
中
Api api = new Api() {
@Override Single<String> getKey() {
return Single.just("apiKey");
}
};
api.getKey()
.flatMap(key -> Single.<String>create( singleSubscriber -> {
Foo foo = new Foo();
foo.setAwesomeCallback(awesome -> {
try { singleSubscriber.onSuccess(awesome);}
catch (Exception e) { singleSubscriber.onError(e); }
});
foo.makeAwesome();
}))
.flatMapCompletable(
awesome -> Completable.create(completableSubscriber -> {
try {
sendAwesome(awesome);
completableSubscriber.onCompleted();
} catch (Exception e) { completableSubscriber.onError(e); }
}))
.subscribe(this::handleAwesomeSent, throwable -> { });
See gist for full anonymous class example
此实现通过使用 Single
和 Completable
类型以及 flatMapCompletable()
类型来适应 david.mihola 答案,同时是 safe/specific 类型。
我正在将 Retrofit 与 Observables 一起使用,并且想链接这些 observables。通常它与像 map()
或 flatMap()
这样的函数配合得很好,因为 api
returns 是一个完成任务的 Observable。但在这种情况下,我必须执行以下操作:
- getKey() 来自
api
- 在另一个库中使用返回的密钥
Foo
并等待回调被调用。 - 当回调returns时,将结果发送给
api
。
我希望这是一个单链调用,这样我只需订阅一次。我猜我可以使用 merge()
或 join()
或其他东西,但不确定处理回调的最佳方法是什么。
有没有办法让它变得更好?这是我目前所拥有的:
api.getKey().subscribe(new Action1<String>() {
@Override
public void call(String key) {
Foo foo = new Foo();
foo.setAwesomeCallback(new AwesomeCallback() {
@Override
public void onAwesomeReady(String awesome) {
api.sendAwesome(awesome)
.subscribe(new Action1<Void>() {
@Override
public void call(Void aVoid) {
handleAwesomeSent();
}
});
}
});
foo.makeAwesome();
}
});
您必须使用 PublishSubject 将基于回调的 API 转换为可观察对象。
尝试类似的方法(未测试):
api.getKey().flatMap(new Func1<String, Observable<String>>() {
@Override
public Observable<String> call(String key) {
Foo foo = new Foo();
PublishSubject<String> subject = PublishSubject.create();
foo.setAwesomeCallback(new AwesomeCallback() {
@Override
public void onAwesomeReady(String awesome) {
subject.onNext(awesome);
subject.onComplete();
}
});
foo.makeAwesome();
return subject;
}
}).flatMap(new Func1<String, Observable<String>>() {
@Override
public Observable<String> call(String awesome) {
return sendAwesome(awesome);
}
}).subscribe(new Action1<Void>() {
@Override
public void call(Void aVoid) {
handleAwesomeSent();
}
});
采用 clemp6r 的解决方案,这是另一个既不需要 Subjects
也不需要嵌套 Subscriptions
的解决方案:
api.getKey().flatMap(new Func1<String, Observable<String>>() {
@Override
public Observable<String> call(String key) {
return Observable.create(new Observable.OnSubscribe<String>(){
@Override
public void call(final Subscriber<? super String> subscriber) {
Foo foo = new Foo();
foo.setAwesomeCallback(new AwesomeCallback() {
@Override
public void onAwesomeReady(String awesome) {
if (! subscriber.isUnsubscribed()) {
subscriber.onNext(awesome);
subscriber.onComplete();
}
}
});
foo.makeAwesome();
}
});
}).flatMap(new Func1<String, Observable<String>>() {
@Override
public Observable<String> call(String awesome) {
return sendAwesome(awesome);
}
}).subscribe(new Action1<Void>() {
@Override
public void call(Void aVoid) {
handleAwesomeSent();
}
});
总的来说,我认为总是可以使用 Observable.create()
.
Observable
中
Api api = new Api() {
@Override Single<String> getKey() {
return Single.just("apiKey");
}
};
api.getKey()
.flatMap(key -> Single.<String>create( singleSubscriber -> {
Foo foo = new Foo();
foo.setAwesomeCallback(awesome -> {
try { singleSubscriber.onSuccess(awesome);}
catch (Exception e) { singleSubscriber.onError(e); }
});
foo.makeAwesome();
}))
.flatMapCompletable(
awesome -> Completable.create(completableSubscriber -> {
try {
sendAwesome(awesome);
completableSubscriber.onCompleted();
} catch (Exception e) { completableSubscriber.onError(e); }
}))
.subscribe(this::handleAwesomeSent, throwable -> { });
See gist for full anonymous class example
此实现通过使用 Single
和 Completable
类型以及 flatMapCompletable()
类型来适应 david.mihola 答案,同时是 safe/specific 类型。