改造链接可观察对象

Retrofit chaining observables

我正在尝试使用 rxjava 进行改造。我在将改进的 Observables 彼此链接或与我创建的 Observables 链接时遇到问题。一些例子:

    Observable<List<Friend>> friendsListObservable = friendsService.getFriends();
    Observable<Void> updateReqestObservable = friendsListObservable.switchMap(friends -> {
        Log.d(TAG, "Hello");
        return userAPI.updateFriends(session.getUserId(), friends);
    }).subscribe();

一切都会被调用,直到它到达 switchMap。所以 hello 永远不会显示,但是如果我 return 例如 Observable.just(null) 而不是改造 observable 它工作正常。另外,如果我在没有链接的情况下使用改造 observable,它就可以工作。

编辑1: 这是一个 android 应用程序。似乎根本没有调用地图运算符。有时它也会发生在改造可观察量的情况下。我仍然认为它与线程有关。据我所知,当一个项目被发出时,一个操作符被调用,但是调用 onNext 不会触发 map 操作符。以下是我的全部代码:

public Observable<List<FacebookFriend>> getFriends() {
    PublishSubject<List<FacebookFriend>> friendsPublishSubject = PublishSubject.create();

    Observable<List<FacebookFriend>> returnObservable = friendsPublishSubject.doOnSubscribe(() -> {
    Log.d(TAG, "OnSubscribe called");
    Session session = Session.getActiveSession();

    if (session != null && session.isOpened()) {
        new Request(session, "/me/friends", null, HttpMethod.GET,
                new Request.Callback() {
                    public void onCompleted(Response response) {
                        JSONObject graphResponse = response.getGraphObject()
                                .getInnerJSONObject();
                        try {
                            JSONArray friends = graphResponse.getJSONArray("data");
                            Gson gson = new Gson();
                            Type listType = new TypeToken<ArrayList<FacebookFriend>>() {
                            }.getType();
                            List<FacebookFriend> friendsList = gson.fromJson(friends.toString(), listType);

                            friendsPublishSubject.onNext(friendsList);
                            friendsPublishSubject.onCompleted();
                        } catch (JSONException e) {
                            e.printStackTrace();
                            friendsPublishSubject.onError(e);
                        }
                    }
                }).executeAsync();
    } else {
        InvalidSessionException exception = new InvalidSessionException("Your facebook session expired");
        friendsPublishSubject.onError(exception);
    }
    });

    return returnObservable.subscribeOn(AndroidSchedulers.mainThread()).observeOn(AndroidSchedulers.mainThread());
}

public Observable<Void> updateFriendsList() {
    Observable<List<FacebookFriend>> facebookFriendsListObservable = facebookService.getFriends();

    Observable<Void> updateReqestObservable = facebookFriendsListObservable.map(friends -> {
    Log.d(TAG, "This is never called");
});

}

在实际调用点绕过阻塞的一种方法是将其订阅到一个主题,然后在代码的任何部分末尾阻塞该主题,要求执行请求。

例如:

final ReplaySubject<Void> subject = ReplaySubject.create();
friendsService.getFriends()
    .switchMap(friends -> userApi.updateFriends(session.getUserId(), friends))
    .subscribeOn(Schedulers.io())
    .subscribe(subject);

// Do other things that don't require this to be done yet...

// Wherever you need to wait for the request to happen:
subject.toBlocking().singleOrDefault(null);

由于主题也是 Observable,您甚至可以 return 方法中的主题,稍后再阻止它。

综上所述,将更新调用用作那里的副作用似乎有点奇怪。 updateFriends 也是 Retrofit 服务吗?如果是这样,您可能需要考虑使更新调用成为同步服务 returns void 而不是您将从 onNext 调用中调用的 Observable<Void> 。如果还需要阻塞,可以这样使用forEach

friendsService.getFriends()
    .forEach(friends -> { userApi.updateFriends(session.getUserId(), friends) });

forEachsubscribe 非常相似,除了它明确地阻止。您甚至可以将它与您的原始代码一起使用,但是添加一个空的 onNext 操作不会非常干净。

如果您还可以提供有关您的应用程序结构的更多详细信息(这都是在一个主要方法中吗?它是一个 Android 应用程序吗?等等)我可以提供更多关于避免阻塞的指示可能。