RxJava - ReplaySubject 只发射数据两次
RxJava - ReplaySubject only emitting data twice
我是 ReactiveX 的新手,我有一个案例,我希望我的 observable 向迟到的订阅者发送数据(每当观察者订阅时,observable 应该发送它之前发送的相同数据)。我制作了这个 Observable class,它向所有观察者提供 ReplaySubject 的相同实例(它是单例 class)。
public class AccountsObservable {
private static ConnectableObservable<String> hotObservable;
private static AccountsObservable accountsObservable;
public static AccountsObservable getObject() {
if (accountsObservable == null) {
accountsObservable = new AccountsObservable();
}
return accountsObservable;
}
public ConnectableObservable<String> getObservable() {
if (hotObservable == null) {
Observable<String> observable = ReplaySubject.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("XYZ");
emitter.onComplete();
}
});
hotObservable = observable.replay();//publish
}
return hotObservable;
}
}
同样,这是创建新观察者实例的观察者class。
public class AccountsObserver {
AccountsFetchListener listener;
public AccountsObserver(AccountsFetchListener listener) {
this.listener = listener;
}
public Observer<String> getObserver() {
return new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String accounts) {
listener.onSuccess(accounts);
}
@Override
public void onError(Throwable e) {
listener.onFailure();
}
@Override
public void onComplete() {
}
};
}
public interface AccountsFetchListener {
void onSuccess(String accounts);
void onFailure();
}
}
这是我测试这些可观察值的函数
private void testObs() {
ConnectableObservable<String> observable = AccountsObservable.getObject().getObservable();
Observer<String> observer = new AccountsObserver(new AccountsObserver.AccountsFetchListener() {
@Override
public void onSuccess(String accounts) {
Log.e("DATA -> ", accounts);
}
@Override
public void onFailure() {
}
}).getObserver();
observable.subscribe(observer);
observable.connect();
}
我调用这个函数 "testObs()" 5 次,但它只发出了 2 次数据。问题似乎出在我提供 ReplaySUbject 实例的 AccountsObservable class 中。谢谢
您的代码运行良好,根据 this:
,您的日志在 logcat 中被抑制
We declared an application as too chatty once it logs more than 5 lines a second. Please file a bug against the application's owner that is producing this developer-verbose-debug-level class logging spam. The logs are 256KB, that means the application is creating a DOS attack and shortening the logs timepan to 6 seconds(!) making it useless for all others.
您可以通过将您的应用列入 logcat 的白名单来避免此行为:
adb logcat -P '<pid or uid of your app>'
我是 ReactiveX 的新手,我有一个案例,我希望我的 observable 向迟到的订阅者发送数据(每当观察者订阅时,observable 应该发送它之前发送的相同数据)。我制作了这个 Observable class,它向所有观察者提供 ReplaySubject 的相同实例(它是单例 class)。
public class AccountsObservable {
private static ConnectableObservable<String> hotObservable;
private static AccountsObservable accountsObservable;
public static AccountsObservable getObject() {
if (accountsObservable == null) {
accountsObservable = new AccountsObservable();
}
return accountsObservable;
}
public ConnectableObservable<String> getObservable() {
if (hotObservable == null) {
Observable<String> observable = ReplaySubject.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("XYZ");
emitter.onComplete();
}
});
hotObservable = observable.replay();//publish
}
return hotObservable;
}
}
同样,这是创建新观察者实例的观察者class。
public class AccountsObserver {
AccountsFetchListener listener;
public AccountsObserver(AccountsFetchListener listener) {
this.listener = listener;
}
public Observer<String> getObserver() {
return new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String accounts) {
listener.onSuccess(accounts);
}
@Override
public void onError(Throwable e) {
listener.onFailure();
}
@Override
public void onComplete() {
}
};
}
public interface AccountsFetchListener {
void onSuccess(String accounts);
void onFailure();
}
}
这是我测试这些可观察值的函数
private void testObs() {
ConnectableObservable<String> observable = AccountsObservable.getObject().getObservable();
Observer<String> observer = new AccountsObserver(new AccountsObserver.AccountsFetchListener() {
@Override
public void onSuccess(String accounts) {
Log.e("DATA -> ", accounts);
}
@Override
public void onFailure() {
}
}).getObserver();
observable.subscribe(observer);
observable.connect();
}
我调用这个函数 "testObs()" 5 次,但它只发出了 2 次数据。问题似乎出在我提供 ReplaySUbject 实例的 AccountsObservable class 中。谢谢
您的代码运行良好,根据 this:
,您的日志在 logcat 中被抑制We declared an application as too chatty once it logs more than 5 lines a second. Please file a bug against the application's owner that is producing this developer-verbose-debug-level class logging spam. The logs are 256KB, that means the application is creating a DOS attack and shortening the logs timepan to 6 seconds(!) making it useless for all others.
您可以通过将您的应用列入 logcat 的白名单来避免此行为:
adb logcat -P '<pid or uid of your app>'