改造链接可观察对象
Retrofit chaining observables
我正在尝试使用 rxjava 进行改造。我在将改进的 Observables 彼此链接或与我创建的 Observables 链接时遇到问题。一些例子:
Observable<List<Friend>> friendsListObservable = friendsService.getFriends();
Observable<Void> updateReqestObservable = friendsListObservable.switchMap(friends -> {
Log.d(TAG, "Hello");
return userAPI.updateFriends(session.getUserId(), friends);
}).subscribe();
一切都会被调用,直到它到达 switchMap。所以 hello 永远不会显示,但是如果我 return 例如 Observable.just(null)
而不是改造 observable 它工作正常。另外,如果我在没有链接的情况下使用改造 observable,它就可以工作。
编辑1:
这是一个 android 应用程序。似乎根本没有调用地图运算符。有时它也会发生在改造可观察量的情况下。我仍然认为它与线程有关。据我所知,当一个项目被发出时,一个操作符被调用,但是调用 onNext
不会触发 map
操作符。以下是我的全部代码:
public Observable<List<FacebookFriend>> getFriends() {
PublishSubject<List<FacebookFriend>> friendsPublishSubject = PublishSubject.create();
Observable<List<FacebookFriend>> returnObservable = friendsPublishSubject.doOnSubscribe(() -> {
Log.d(TAG, "OnSubscribe called");
Session session = Session.getActiveSession();
if (session != null && session.isOpened()) {
new Request(session, "/me/friends", null, HttpMethod.GET,
new Request.Callback() {
public void onCompleted(Response response) {
JSONObject graphResponse = response.getGraphObject()
.getInnerJSONObject();
try {
JSONArray friends = graphResponse.getJSONArray("data");
Gson gson = new Gson();
Type listType = new TypeToken<ArrayList<FacebookFriend>>() {
}.getType();
List<FacebookFriend> friendsList = gson.fromJson(friends.toString(), listType);
friendsPublishSubject.onNext(friendsList);
friendsPublishSubject.onCompleted();
} catch (JSONException e) {
e.printStackTrace();
friendsPublishSubject.onError(e);
}
}
}).executeAsync();
} else {
InvalidSessionException exception = new InvalidSessionException("Your facebook session expired");
friendsPublishSubject.onError(exception);
}
});
return returnObservable.subscribeOn(AndroidSchedulers.mainThread()).observeOn(AndroidSchedulers.mainThread());
}
public Observable<Void> updateFriendsList() {
Observable<List<FacebookFriend>> facebookFriendsListObservable = facebookService.getFriends();
Observable<Void> updateReqestObservable = facebookFriendsListObservable.map(friends -> {
Log.d(TAG, "This is never called");
});
}
在实际调用点绕过阻塞的一种方法是将其订阅到一个主题,然后在代码的任何部分末尾阻塞该主题,要求执行请求。
例如:
final ReplaySubject<Void> subject = ReplaySubject.create();
friendsService.getFriends()
.switchMap(friends -> userApi.updateFriends(session.getUserId(), friends))
.subscribeOn(Schedulers.io())
.subscribe(subject);
// Do other things that don't require this to be done yet...
// Wherever you need to wait for the request to happen:
subject.toBlocking().singleOrDefault(null);
由于主题也是 Observable
,您甚至可以 return 方法中的主题,稍后再阻止它。
综上所述,将更新调用用作那里的副作用似乎有点奇怪。 updateFriends
也是 Retrofit 服务吗?如果是这样,您可能需要考虑使更新调用成为同步服务 returns void
而不是您将从 onNext 调用中调用的 Observable<Void>
。如果还需要阻塞,可以这样使用forEach
:
friendsService.getFriends()
.forEach(friends -> { userApi.updateFriends(session.getUserId(), friends) });
forEach
与 subscribe
非常相似,除了它明确地阻止。您甚至可以将它与您的原始代码一起使用,但是添加一个空的 onNext
操作不会非常干净。
如果您还可以提供有关您的应用程序结构的更多详细信息(这都是在一个主要方法中吗?它是一个 Android 应用程序吗?等等)我可以提供更多关于避免阻塞的指示可能。
我正在尝试使用 rxjava 进行改造。我在将改进的 Observables 彼此链接或与我创建的 Observables 链接时遇到问题。一些例子:
Observable<List<Friend>> friendsListObservable = friendsService.getFriends();
Observable<Void> updateReqestObservable = friendsListObservable.switchMap(friends -> {
Log.d(TAG, "Hello");
return userAPI.updateFriends(session.getUserId(), friends);
}).subscribe();
一切都会被调用,直到它到达 switchMap。所以 hello 永远不会显示,但是如果我 return 例如 Observable.just(null)
而不是改造 observable 它工作正常。另外,如果我在没有链接的情况下使用改造 observable,它就可以工作。
编辑1:
这是一个 android 应用程序。似乎根本没有调用地图运算符。有时它也会发生在改造可观察量的情况下。我仍然认为它与线程有关。据我所知,当一个项目被发出时,一个操作符被调用,但是调用 onNext
不会触发 map
操作符。以下是我的全部代码:
public Observable<List<FacebookFriend>> getFriends() {
PublishSubject<List<FacebookFriend>> friendsPublishSubject = PublishSubject.create();
Observable<List<FacebookFriend>> returnObservable = friendsPublishSubject.doOnSubscribe(() -> {
Log.d(TAG, "OnSubscribe called");
Session session = Session.getActiveSession();
if (session != null && session.isOpened()) {
new Request(session, "/me/friends", null, HttpMethod.GET,
new Request.Callback() {
public void onCompleted(Response response) {
JSONObject graphResponse = response.getGraphObject()
.getInnerJSONObject();
try {
JSONArray friends = graphResponse.getJSONArray("data");
Gson gson = new Gson();
Type listType = new TypeToken<ArrayList<FacebookFriend>>() {
}.getType();
List<FacebookFriend> friendsList = gson.fromJson(friends.toString(), listType);
friendsPublishSubject.onNext(friendsList);
friendsPublishSubject.onCompleted();
} catch (JSONException e) {
e.printStackTrace();
friendsPublishSubject.onError(e);
}
}
}).executeAsync();
} else {
InvalidSessionException exception = new InvalidSessionException("Your facebook session expired");
friendsPublishSubject.onError(exception);
}
});
return returnObservable.subscribeOn(AndroidSchedulers.mainThread()).observeOn(AndroidSchedulers.mainThread());
}
public Observable<Void> updateFriendsList() {
Observable<List<FacebookFriend>> facebookFriendsListObservable = facebookService.getFriends();
Observable<Void> updateReqestObservable = facebookFriendsListObservable.map(friends -> {
Log.d(TAG, "This is never called");
});
}
在实际调用点绕过阻塞的一种方法是将其订阅到一个主题,然后在代码的任何部分末尾阻塞该主题,要求执行请求。
例如:
final ReplaySubject<Void> subject = ReplaySubject.create();
friendsService.getFriends()
.switchMap(friends -> userApi.updateFriends(session.getUserId(), friends))
.subscribeOn(Schedulers.io())
.subscribe(subject);
// Do other things that don't require this to be done yet...
// Wherever you need to wait for the request to happen:
subject.toBlocking().singleOrDefault(null);
由于主题也是 Observable
,您甚至可以 return 方法中的主题,稍后再阻止它。
综上所述,将更新调用用作那里的副作用似乎有点奇怪。 updateFriends
也是 Retrofit 服务吗?如果是这样,您可能需要考虑使更新调用成为同步服务 returns void
而不是您将从 onNext 调用中调用的 Observable<Void>
。如果还需要阻塞,可以这样使用forEach
:
friendsService.getFriends()
.forEach(friends -> { userApi.updateFriends(session.getUserId(), friends) });
forEach
与 subscribe
非常相似,除了它明确地阻止。您甚至可以将它与您的原始代码一起使用,但是添加一个空的 onNext
操作不会非常干净。
如果您还可以提供有关您的应用程序结构的更多详细信息(这都是在一个主要方法中吗?它是一个 Android 应用程序吗?等等)我可以提供更多关于避免阻塞的指示可能。