RxJava 可观察对象不发出事件

RxJava observables not emitting events

我的 android 应用程序有一个显示 Feed 详细信息的 FeedDetailFragment。提要具有基本信息和元数据,可通过对服务器的两次单独调用来检索。服务端界面填充Retrofit。我已经实现了一些东西,就我的新手 Rx 知识而言,看起来合乎逻辑。但是,您可能已经猜到了,它不起作用。

外部 classes:

相关FeedDetailFragment代码:

public class FeedDetailFragment extends Fragment implements OnMapReadyCallback {
    public static final String ARG_FEED_ID       = "feed_id";
    public static final String ARG_FEED_INFO     = "feed_info";
    public static final String ARG_FEED_METADATA = "feed_metadata";
    public static final int INVALID_FEED_ID      = -1;

    ...

    private class PlaceFeedSubscriber extends Subscriber<Pair<GoogleMap, Feed>> {
        @Override
        public void onNext(Pair<GoogleMap, Feed> pair) {
            Log.i(TAG, String.format("Placing feed %d on [%f, %f] onto map %s",
                    pair.second.getInfo(),
                    pair.second.getMetadata().getSensorLatitude(),
                    pair.second.getMetadata().getSensorLongitude(),
                    pair.first.getMapType()));

            pair.first.addMarker(new MarkerOptions()
                    .position(new LatLng(
                            pair.second.getMetadata().getSensorPoint().getCoordinates()[1],
                            pair.second.getMetadata().getSensorPoint().getCoordinates()[0]))
                    .title("Marker"));
            mMapAPI.moveCamera(CameraUpdateFactory.newLatLngZoom(
                    new LatLng(
                            pair.second.getMetadata().getSensorPoint().getCoordinates()[1],
                            pair.second.getMetadata().getSensorPoint().getCoordinates()[0])
                    , 15));
        }

        @Override
        public void onCompleted() {
            Log.i(TAG, "Completed drawing of feed");
        }

        @Override
        public void onError(Throwable e) {
            Log.e(TAG, "Drawing of feed failed with: " + e);
        }
    }

    public FeedDetailFragment() {
        mMapObservable = Observable.empty().subscribeOn(Schedulers.io());
        mFeedIdObservable = Observable.empty().subscribeOn(Schedulers.io());
        mFeedInfoObservable = Observable.empty();
        mFeedMetadataObservable = Observable.empty();

        // Start fetching new feed information
        mFeedIdObservable.doOnEach(new Action1<Integer>() {
            @Override
            public void call(Integer feedId) {
                Log.d(TAG, "Got a new feed id - " + feedId);
                mFeedInfoObservable.mergeWith(mUvClient.getFeed(feedId));
            }
        });

        // Start fetching new feed metadata
        mFeedInfoObservable.doOnEach(new Action1<FeedInfo>() {
            @Override
            public void call(FeedInfo feedInfo) {
                Log.d(TAG, "Got a new feed info - " + feedInfo.getTitle());
                mFeedMetadataObservable.mergeWith(mUvClient.getFeedMetadata(feedInfo.getId()));
            }
        });

        // Produce a new feed
        mFeedObservable = Observable.combineLatest(mFeedInfoObservable, mFeedMetadataObservable, new Func2<FeedInfo, FeedMetadata, Feed>() {
            @Override
            public Feed call(FeedInfo feedInfo, FeedMetadata feedMetadata) {
                return new Feed(feedInfo, feedMetadata);
            }
        });

        // Render the feed onto map
        Observable.combineLatest(mFeedObservable, mMapObservable, new Func2<Feed, GoogleMap, Pair<GoogleMap, Feed>>() {
            @Override
            public Pair<GoogleMap, Feed> call(Feed feed, GoogleMap map) {
                return new Pair(map, feed);
            }
        }).observeOn(AndroidSchedulers.mainThread())
                .subscribe(new PlaceFeedSubscriber());
    }

    @Override
    public void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);

        Bundle arguments = getArguments();

        if (arguments.containsKey(ARG_FEED_ID)) {
            setFeed(arguments.getInt(ARG_FEED_ID));
        }
        else if (arguments.containsKey(ARG_FEED_INFO)) {
            if (arguments.containsKey(ARG_FEED_METADATA)) {
                setFeed((FeedInfo)Parcels.unwrap(arguments.getParcelable(ARG_FEED_INFO)),
                        (FeedMetadata)Parcels.unwrap(arguments.getParcelable(ARG_FEED_METADATA)));
            }
            else {
                setFeed((FeedInfo)Parcels.unwrap(arguments.getParcelable(ARG_FEED_INFO)));
            }
        }
    }

    ...

    @Override
    public void onMapReady(GoogleMap googleMap) {
        mMapAPI = googleMap;

        mMapObservable.mergeWith(Observable.just(googleMap));
    }

    /**
     * Sets the feed ID to be shown in the fragment. This triggers the chain of fetching feed info
     * and feed metadata, finally displaying it on the map.
     * @param feedId ID of the feed to display in the fragment.
     */
    public void setFeed(int feedId) {
        Log.d(TAG, String.format("Setting new feed ID - %d", feedId));
        mFeedIdObservable.mergeWith(Observable.just(feedId));
    }

    /**
     * Sets feed info. This triggers fetching of feed metadata, finally displaying it on the map.
     * @param feedInfo Information of the feed to display on the map.
     */
    public void setFeed(FeedInfo feedInfo) {
        Log.d(TAG, String.format("Setting new feed info - %s", feedInfo.getTitle()));
        mFeedInfoObservable.mergeWith(Observable.just(feedInfo));
    }

    /**
     * Displays feed info on the map.
     * @param feedInfo Information of the feed to display on the map.
     * @param feedMetadata Metadata of the feed to display on the map.
     */
    public void setFeed(FeedInfo feedInfo, FeedMetadata feedMetadata) {
        Log.d(TAG, String.format("Setting new feed info and metadata - %s", feedInfo.getTitle()));
        mFeedObservable.mergeWith(Observable.just(new Feed(feedInfo, feedMetadata)));
    }
}

我看到的日志输出如下:

Setting new feed info - SampleFeed
Completed drawing of feed

我的总体想法是,当我合并 Observable 时,它​​们会发出新数据。一些 Observable 被创建为空的,因此它们不会发出任何东西,但我仍然可以使用它们。

潜在流量如下:

这就是我脑海中的想法。显然,这是错误的。我哪里出错了,我该如何解决?我喜欢一些指导,也许更通用的 ideology/methodology 教学方法。感谢您的任何建议!

更新 1

所以我一直在努力解决这个问题。阅读更多文档……很多东西要学。我已将 Observable.empty() 替换为 Observable.never()。从文档中,我读到 empty 不发出任何内容并完成,这不是我想要的。另一方面,never 不发出任何东西但不完成。因此,我可以将它用于我正在寻找的目的。仍然没有得到我想要的,但我希望,更近一步。

更新 2

掌握更多技巧。查看 .never().empty() 的来源,我发现前者不调用 .onNext() 而后者调用 .onComplete()。中间没有什么我可以选择的。开始四处寻找替代品。基本上,我的代码没有执行,因为在我之前的尝试中,observable 要么立即完成,要么从未继续调用 next。但是,一开始没有什么可以调用.onNext()的。因此,我需要一个占位符。

阅读更多文档,我遇到了 Subjects。特别是,PublishSubject 在订阅者订阅之前不会发出任何内容。这似乎是一个可行的解决方案。但是,订阅者必须直接订阅该主题。这似乎不适用于 .mergeWith() 这个主题。

不会放弃:)

更新 3

感谢@dwursteisen,我继续使用 PublishSubject 方法。这是更改的相关代码:

...

private PublishSubject<GoogleMap> mMapObservable = null;
private PublishSubject<Feed> mFeedObservable = null;
private PublishSubject<Integer> mFeedIdObservable = null;
private PublishSubject<FeedInfo> mFeedInfoObservable = null;
private PublishSubject<FeedMetadata> mFeedMetadataObservable = null;

...

public FeedDetailFragment() {
    mMapObservable = PublishSubject.create();
    mFeedObservable = PublishSubject.create();
    mFeedIdObservable = PublishSubject.create();
    mFeedInfoObservable = PublishSubject.create();
    mFeedMetadataObservable = PublishSubject.create();

    mMapObservable.subscribe(new Action1<GoogleMap>() {
        @Override
        public void call(GoogleMap googleMap) {
            mMapApi = googleMap;
        }
    });

    mFeedMetadataObservable.subscribe(new Action1<FeedMetadata>() {
        @Override
        public void call(FeedMetadata feedMetadata) {
            // no code
        }
    });

    mFeedObservable.subscribe(new Action1<Feed>() {
        @Override
        public void call(Feed feed) {
            // no code
        }
    });

    // Start fetching new feed information
    mFeedIdObservable.subscribe(new Action1<Integer>() {
        @Override
        public void call(Integer feedId) {
            mUvClient.getFeed(feedId).subscribe(new Action1<FeedInfo>() {
                @Override
                public void call(FeedInfo feedInfo) {
                    mFeedInfoObservable.onNext(feedInfo);
                }
            });
        }
    });

    // Start fetching new feed metadata
    mFeedInfoObservable
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Action1<FeedInfo>() {
                @Override
                public void call(FeedInfo feedInfo) {
                    mFeedTitle.setText(feedInfo.getTitle());

                    mUvClient.getFeedMetadata(feedInfo.getId()).subscribe(new Action1<FeedMetadata>() {
                        @Override
                        public void call(FeedMetadata feedMetadata) {
                            mFeedMetadataObservable.onNext(feedMetadata);
                        }
                    });
                }
            });

    // Produce a new feed
    Observable.combineLatest(mFeedInfoObservable, mFeedMetadataObservable, new Func2<FeedInfo, FeedMetadata, Feed>() {
        @Override
        public Feed call(FeedInfo feedInfo, FeedMetadata feedMetadata) {
            Feed feed = new Feed(feedInfo, feedMetadata);
            return feed;
        }
    }).subscribeOn(Schedulers.io()).subscribe(new Action1<Feed>() {
        @Override
        public void call(Feed feed) {
            mFeedObservable.onNext(feed);
        }
    });

    // Render the feed onto map
    Observable.combineLatest(mFeedObservable, mMapObservable, new Func2<Feed, GoogleMap, Pair<GoogleMap, Feed>>() {
        @Override
        public Pair<GoogleMap, Feed> call(Feed feed, GoogleMap map) {
            return new Pair(map, feed);
        }
    }).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread())
            .subscribe(new PlaceFeedSubscriber());
}

...

@Override
public void onMapReady(GoogleMap googleMap) {
    mMapObservable.onNext(googleMap);
}

/**
 * Sets the feed ID to be shown in the fragment. This triggers the chain of fetching feed info
 * and feed metadata, finally displaying it on the map.
 * @param feedId ID of the feed to display in the fragment.
 */
public void setFeed(int feedId) {
    mFeedIdObservable.onNext(feedId);
}

/**
 * Sets feed info. This triggers fetching of feed metadata, finally displaying it on the map.
 * @param feedInfo Information of the feed to display on the map.
 */
public void setFeed(FeedInfo feedInfo) {
    mFeedInfoObservable.onNext(feedInfo);
}

/**
 * Displays feed info on the map.
 * @param feedInfo Information of the feed to display on the map.
 * @param feedMetadata Metadata of the feed to display on the map.
 */
public void setFeed(FeedInfo feedInfo, FeedMetadata feedMetadata) {
    mFeedObservable.onNext(new Feed(feedInfo, feedMetadata));
}

显然,既然我已经掌握了基础知识,我将仔细研究它并正确处理错误、缓存和其他条件。但是,我确实有一个问题:是否有任何方法可以简化以下代码以直接使用 Retrofit observable 而不是在订阅内部订阅它...也许 Rx 运算符会将其解析注入 mFeedInfoObservable?

mFeedIdObservable.subscribe(new Action1<Integer>() {
    @Override
    public void call(Integer feedId) {
        mUvClient.getFeed(feedId).subscribe(new Action1<FeedInfo>() {
            @Override
            public void call(FeedInfo feedInfo) {
                mFeedInfoObservable.onNext(feedInfo);
            }
        });
    }
});

此外,我很想听听对一般方法的任何评论。我仍然在思考 Rx,我的实现不是最好的,我敢肯定。

 mFeedInfoObservable = Observable.empty();

你构建了一个空的 Observable,它永远不会发出值。因此,当您订阅此 Observable 时,您只会收到订阅完成的通知。

mFeedInfoObservable.mergeWith(Observable.just(feedInfo));

Observable 是不可变的。这意味着调用一个方法不会改变它的状态。 mergeWith 将产生一个新的 Observable,这是 Observable 与另一个合并的结果。

因此,在您的情况下,您构建了一个未使用的新 Observable

根据您的代码,您似乎需要 Subject(就像您提到的:PublishSubject)从不同的用户调用发出值。

private final Subject<Integer, Integer> subject = PublishSubject.create();


public void setFeed(int feedId) {
    subject.onNext(feedId);
}

public FeedDetailFragment() {
    subject.flatMap(feedId -> mUvClient.getFeed(feedId))
           .subscribe(/**...**/);
}

请注意,doOnNext 应用于副作用调用(即:将更改 Observable 之外的元素的代码,例如日志记录,...)。我认为在您的情况下,您可能需要其他运算符,例如 flatMapzip、...,以便像您想要实现的那样组合结果。