从远程重用 RxJava 流 API
Reuse RxJava stream from a remote API
我有一个 API 调用,我想使用 Observable 包装它:
private Observable<RealmResults<Account>> getAccounts() {
final Observable<RealmResults<Account>> realmAccounts =
Observable.defer(new Func0<Observable<RealmResults<Account>>>() {
@Override
public Observable<RealmResults<Account>> call() {
return RealmObservable.results(getActivity(), new Func1<Realm, RealmResults<Account>>() {
@Override
public RealmResults<Account> call(Realm realm) {
return realm.where(Account.class).findAll();
}
});
}
});
return Observable
.create(new Observable.OnSubscribe<RealmResults<Account>>() {
@Override
public void call(final Subscriber<? super RealmResults<Account>> subscriber) {
DataBridge.getAccounts(Preferences.getString(Constant.ME_GUID, ""), new OnResponseListener() {
@Override
public void OnSuccess(Object data) {
Log.d("Stream", "onSuccess");
realmAccounts.subscribe(subscriber);
}
@Override
public void onFailure(Object data) {
subscriber.onError(new Exception(data.toString()));
}
});
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.startWith(realmAccounts);
}
我喜欢用它
Observable<Accounts> accounts = getAccounts().flatMap(
new Func1<RealmResults<Account>, Observable<Account>>() {
@Override
public Observable<Account> call(RealmResults<Account> accounts) {
return Observable.from(accounts);
}
});
如何多次使用 accounts
observable 而无需每次都调用 API。我需要处理帐户流并从中提取不同的数据集。
最简单的方法是使用运算符cache, which internally uses ReplaySubject。它缓存源可观察项,然后从缓存中提供结果。
...
Observable<<RealmResults<Account>> cachedResult = getAccounts().cache();
Observable<Accounts> accountsObservable = cachedResult.flatMap(...);
Observable<X> xObservable = cachedResult.flatMap(...);
如果您想避免缓存结果,您应该使用 Connectable Observables。通常它只对 Hot Observables 有意义。 Connectable observable 在其 Connect 方法被调用之前不会开始发射项目。您可以使用 publish 运算符转换为 Connectable Observable。
ConnectableObservable<<RealmResults<Account>> connectebleObservable = getAccounts().publish();
Observable<Accounts> accountsObservable = connectebleObservable .flatMap(...);
Observable<X> xObservable = connectebleObservable .flatMap(...);
//You must subscribe before connect
accountsObservable.subsribe(...);
xObservable.subscribe(...);
//start emiting data
connectebleObservable.connect();
这里要注意的是,你必须在连接之前订阅 - 以避免数据丢失 - 否则你必须使用重播运算符,它类似于缓存运算符,但用于可连接的可观察
那么分享呢?
它创建 ConnectableObservable 并将其作为常规 Observable 公开。首次订阅自动导致连接和发射。
在您的案例中使用的共享,不重放可能会导致数据丢失或多次执行,具体取决于时间。
例如,对于 2 个订阅者和流中的一个项目,您可能有以下情况:
- 2 个订阅在 onNext 之前创建 - 按预期工作。
- 第二个订阅在 onNext 之后但在 onComplete 之前创建 - 第二个订阅仅获得 onComplete
- 在 onComplete 之后创建的第二个订阅 - 2 次执行没有缓存
我有一个 API 调用,我想使用 Observable 包装它:
private Observable<RealmResults<Account>> getAccounts() {
final Observable<RealmResults<Account>> realmAccounts =
Observable.defer(new Func0<Observable<RealmResults<Account>>>() {
@Override
public Observable<RealmResults<Account>> call() {
return RealmObservable.results(getActivity(), new Func1<Realm, RealmResults<Account>>() {
@Override
public RealmResults<Account> call(Realm realm) {
return realm.where(Account.class).findAll();
}
});
}
});
return Observable
.create(new Observable.OnSubscribe<RealmResults<Account>>() {
@Override
public void call(final Subscriber<? super RealmResults<Account>> subscriber) {
DataBridge.getAccounts(Preferences.getString(Constant.ME_GUID, ""), new OnResponseListener() {
@Override
public void OnSuccess(Object data) {
Log.d("Stream", "onSuccess");
realmAccounts.subscribe(subscriber);
}
@Override
public void onFailure(Object data) {
subscriber.onError(new Exception(data.toString()));
}
});
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.startWith(realmAccounts);
}
我喜欢用它
Observable<Accounts> accounts = getAccounts().flatMap(
new Func1<RealmResults<Account>, Observable<Account>>() {
@Override
public Observable<Account> call(RealmResults<Account> accounts) {
return Observable.from(accounts);
}
});
如何多次使用 accounts
observable 而无需每次都调用 API。我需要处理帐户流并从中提取不同的数据集。
最简单的方法是使用运算符cache, which internally uses ReplaySubject。它缓存源可观察项,然后从缓存中提供结果。
...
Observable<<RealmResults<Account>> cachedResult = getAccounts().cache();
Observable<Accounts> accountsObservable = cachedResult.flatMap(...);
Observable<X> xObservable = cachedResult.flatMap(...);
如果您想避免缓存结果,您应该使用 Connectable Observables。通常它只对 Hot Observables 有意义。 Connectable observable 在其 Connect 方法被调用之前不会开始发射项目。您可以使用 publish 运算符转换为 Connectable Observable。
ConnectableObservable<<RealmResults<Account>> connectebleObservable = getAccounts().publish();
Observable<Accounts> accountsObservable = connectebleObservable .flatMap(...);
Observable<X> xObservable = connectebleObservable .flatMap(...);
//You must subscribe before connect
accountsObservable.subsribe(...);
xObservable.subscribe(...);
//start emiting data
connectebleObservable.connect();
这里要注意的是,你必须在连接之前订阅 - 以避免数据丢失 - 否则你必须使用重播运算符,它类似于缓存运算符,但用于可连接的可观察
那么分享呢?
它创建 ConnectableObservable 并将其作为常规 Observable 公开。首次订阅自动导致连接和发射。
在您的案例中使用的共享,不重放可能会导致数据丢失或多次执行,具体取决于时间。 例如,对于 2 个订阅者和流中的一个项目,您可能有以下情况:
- 2 个订阅在 onNext 之前创建 - 按预期工作。
- 第二个订阅在 onNext 之后但在 onComplete 之前创建 - 第二个订阅仅获得 onComplete
- 在 onComplete 之后创建的第二个订阅 - 2 次执行没有缓存