片段订阅观察者

Fragment subscribe to Observer

我第一次尝试使用 RxJava 实现 Observer/Subscriber。 我收到编译错误:

cannot resolve method subscribe(android.support.v4.app.Fragment) 

在下面指示的行中。所以我没有正确订阅。 我该怎么做?

public class MainActivity extends AppCompatActivity {
    @Override
    protected void onCreate(Bundle savedInstanceState) {

        Fragment myFragment = mTabsPageAdapter.getItem(2);

        Observable<String> loadAndStoreDataObservable = Observable.create(
                new Observable.OnSubscribe<String>() {
                    @Override
                    public void call(Subscriber<? super String> subscriber) {
                        try {           
                            <get data from RESTful service>
                            <write data to SQLite db on device>

                            subscriber.onNext("Done");
                            subscriber.onCompleted();
                        }
                        catch (Exception e) {
                            subscriber.onError(e);
                        }
                    }
                }
        )
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(myFragment); // cannot resolve method subscribe(android.support.v4.app.Fragment)
    }
}

import android.support.v4.app.Fragment;
public class MyFragment extends Fragment implements Observer<String> {

        @Override                         
        public void onNext(String s) {    
            System.out.println(s);        
        }                                 

        @Override                         
        public void onCompleted() {       
        }                                 

        @Override                         
        public void onError(Throwable e) {
        }
}      

编辑:我更改了 Vladimir Mironov 建议的行。这似乎是必要的,但还不够。实施它然后我在之后的行上得到一个编译错误:
不兼容的类型:必需rx.Observable 找到 rx.Subscription

它建议转换为 (Observable<String>),如下所示:

Observable<String> loadAndStoreDataObservable = (Observable<String>) Observable.create(...)

确实编译无误,但给出了运行时错误:
java.lang.ClassCastException: rx.observers.SafeSubscriber 无法转换为 rx.Observable

编辑 2: 我觉得应该是:Subscription loadAndStoreDataObservable = ...

没有人站出来,所以我会把评论汇总成一个答案。

  1. 投射到 MyFragment:

    MyFragment myFragment = (MyFragment) mTabsPageAdapter.getItem(2);
    
  2. 将 loadAndStoreDataObservable 更改为 Subscription

    Subscription loadAndStoreDataObservable = ...
    
  3. OnDestroy()中取消订阅:

    protected void onDestroy() {  
        super.onDestroy();  
        if (loadAndStoreDataObservable != null) {  
           loadAndStoreDataObservable.unsubscribe();  
        }
    }