RxAndroid 间隔与 FlatMap 触发 CALL 两次而不是一次

RxAndroid interval with FlatMap trigger CALL twice instead once

代码

 heartBeatSub = Observable.interval(HEARTBEAT_INTERVAL, TimeUnit.SECONDS)
            .flatMap(new Func1<Long, Observable<Notification<Response>>>() {
                @Override
                public Observable<Notification<Response>> call(Long aLong) {
                    return api.requestHeartBeat(vehicleId).materialize();
                }
            })
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Action1<Notification<Response>>() {
                @Override
                public void call(Notification<Response> responseNotification) {
                    Log.i("HEARTBEAT_INTERVAL", "Response from HEARTBEAT");
                }
            }, new Action1<Throwable>() {
                @Override
                public void call(Throwable throwable) {
                    // TODO: 22/03/16 ADD ERROR HANDLING
                }
            });

问题

我的 call 方法每个间隔触发两次而不是一次。

03-22 11:57:47.236 28078-28078/com.app I/HEARTBEAT_INTERVAL: Response from HEARTBEAT
03-22 11:57:47.876 28078-28078/com.app I/HEARTBEAT_INTERVAL: Response from HEARTBEAT

我携带的第一个:Method threw 'java.lang.NullPointerException' exception. Cannot evaluate rx.Notification.toString() 并称为 onNext.

第二个正常Response。 并称为 onCompleted

========固定代码==========

在@Daniel Lew 的帮助下,我修复了我的代码,现在它可以正常工作了

  private void triggerHeartBeat(final String vehicleId) {
    heartBeatSub = Observable.interval(HEARTBEAT_INTERVAL, TimeUnit.MINUTES)
            .flatMap(new Func1<Long, Observable<Response>>() {
                @Override
                public Observable<Response> call(Long aLong) {
                    return api.requestHeartBeat(vehicleId);
                }
            })
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Action1<Response>() {
                @Override
                public void call(Response response) {
                    Log.i("HEARTBEAT_INTERVAL", "Response from HEARTBEAT");
                }
            }, new Action1<Throwable>() {
                @Override
                public void call(Throwable throwable) {
                    // TODO: 22/03/16 ADD ERROR HANDLING
                }
            });

你不应该打电话给 materialize()。它将所有通知(onNext()onCompleted()onError())提升到它们自己的 onNext() 调用中,这会产生负面影响。

flatMap() 中的每个 API 请求都是一个完整的 Observable,这意味着它同时调用了 onNext(response)onCompleted()。通常 flatMap() 不会转发 onCompleted()(因为 interval() 尚未完成),但由于您调用 materialize(),所有通知都会转发给订阅者。

换句话说,您得到的是:

  1. 通知(onNext(包含您的回复))
  2. 通知(onCompleted)

如果你没有使用 materialize() 你会得到你想要的:

  1. onNext(包含您的回复)。