Rxjava 事件总线在恢复时再次订阅不起作用

Rxjava event bus subscribing again on resume not working

我是 RxJava 的新手,我正在尝试将 RxJava 用作事件总线。我正在使用 Kaushik Gopal (https://github.com/kaushikgopal/RxJava-Android-Samples/blob/master/app/src/main/java/com/morihacky/android/rxjava/rxbus/RxBus.java) 的 RxJava 示例。

这是我的 RxBus 代码

public class RxBus {

 private static RxBus _instance;
 private final Subject<Object, Object> _bus;

 public static synchronized RxBus getInstance(){
    if(_instance == null){
        _instance = new RxBus();
    }
    return _instance;
 }

 private RxBus() {
    _bus = new SerializedSubject<>(PublishSubject.create());
 }

 public void send(Object o) {
    _bus.onNext(o);
 }

 public Observable<Object> toObserverable() {
    return _bus;
 }

 public boolean hasObservers() {
    return _bus.hasObservers();
 }
}

我在另一个线程上发帖,但我在 UI 线程上订阅。这是我的订阅码

public abstract class BaseActivity extends AppCompatActivity {

        protected CompositeSubscription _subscriptions;

        @Override
        protected void onCreate(Bundle savedInstanceState) {
            ...
            _subscriptions = new CompositeSubscription();
            ...
        }

        @Override
        protected void onResume() {

            ....
            _subscriptions.add(RxBus.getInstance().toObserverable()
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Action1<Object>() {
                        @Override
                        public void call(final Object event) {
                            //Do something here
                        }
                    }));

            .....        
        }

        @Override
        protected void onPause() {

            ......
            _subscriptions.unsubscribe();
    }
}

当我第一次加载 activity 时,我确实收到了事件。但是,如果我再次关闭并打开 activity(触发 onPause 和 onResume),我将停止获取事件。我在这里遗漏了什么吗?这是将 RxJava 用于事件总线的正确方法吗?我再次在不同的线程上发帖,但我订阅了 UI 线程。谢谢

我想通了这个问题。必须调用 _subscriptions.clear() 而不是 _subscriptions.unsubscribe();