Rxjava 事件总线在恢复时再次订阅不起作用
Rxjava event bus subscribing again on resume not working
我是 RxJava 的新手,我正在尝试将 RxJava 用作事件总线。我正在使用 Kaushik Gopal (https://github.com/kaushikgopal/RxJava-Android-Samples/blob/master/app/src/main/java/com/morihacky/android/rxjava/rxbus/RxBus.java) 的 RxJava 示例。
这是我的 RxBus 代码
public class RxBus {
private static RxBus _instance;
private final Subject<Object, Object> _bus;
public static synchronized RxBus getInstance(){
if(_instance == null){
_instance = new RxBus();
}
return _instance;
}
private RxBus() {
_bus = new SerializedSubject<>(PublishSubject.create());
}
public void send(Object o) {
_bus.onNext(o);
}
public Observable<Object> toObserverable() {
return _bus;
}
public boolean hasObservers() {
return _bus.hasObservers();
}
}
我在另一个线程上发帖,但我在 UI 线程上订阅。这是我的订阅码
public abstract class BaseActivity extends AppCompatActivity {
protected CompositeSubscription _subscriptions;
@Override
protected void onCreate(Bundle savedInstanceState) {
...
_subscriptions = new CompositeSubscription();
...
}
@Override
protected void onResume() {
....
_subscriptions.add(RxBus.getInstance().toObserverable()
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Object>() {
@Override
public void call(final Object event) {
//Do something here
}
}));
.....
}
@Override
protected void onPause() {
......
_subscriptions.unsubscribe();
}
}
当我第一次加载 activity 时,我确实收到了事件。但是,如果我再次关闭并打开 activity(触发 onPause 和 onResume),我将停止获取事件。我在这里遗漏了什么吗?这是将 RxJava 用于事件总线的正确方法吗?我再次在不同的线程上发帖,但我订阅了 UI 线程。谢谢
我想通了这个问题。必须调用 _subscriptions.clear() 而不是 _subscriptions.unsubscribe();
我是 RxJava 的新手,我正在尝试将 RxJava 用作事件总线。我正在使用 Kaushik Gopal (https://github.com/kaushikgopal/RxJava-Android-Samples/blob/master/app/src/main/java/com/morihacky/android/rxjava/rxbus/RxBus.java) 的 RxJava 示例。
这是我的 RxBus 代码
public class RxBus {
private static RxBus _instance;
private final Subject<Object, Object> _bus;
public static synchronized RxBus getInstance(){
if(_instance == null){
_instance = new RxBus();
}
return _instance;
}
private RxBus() {
_bus = new SerializedSubject<>(PublishSubject.create());
}
public void send(Object o) {
_bus.onNext(o);
}
public Observable<Object> toObserverable() {
return _bus;
}
public boolean hasObservers() {
return _bus.hasObservers();
}
}
我在另一个线程上发帖,但我在 UI 线程上订阅。这是我的订阅码
public abstract class BaseActivity extends AppCompatActivity {
protected CompositeSubscription _subscriptions;
@Override
protected void onCreate(Bundle savedInstanceState) {
...
_subscriptions = new CompositeSubscription();
...
}
@Override
protected void onResume() {
....
_subscriptions.add(RxBus.getInstance().toObserverable()
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Object>() {
@Override
public void call(final Object event) {
//Do something here
}
}));
.....
}
@Override
protected void onPause() {
......
_subscriptions.unsubscribe();
}
}
当我第一次加载 activity 时,我确实收到了事件。但是,如果我再次关闭并打开 activity(触发 onPause 和 onResume),我将停止获取事件。我在这里遗漏了什么吗?这是将 RxJava 用于事件总线的正确方法吗?我再次在不同的线程上发帖,但我订阅了 UI 线程。谢谢
我想通了这个问题。必须调用 _subscriptions.clear() 而不是 _subscriptions.unsubscribe();