为什么不调用 doOnNext()?
Why doesn't doOnNext() get called?
我的订户在下面没有调用 onNext() 和 onCompleted()。我已经尝试通过 doOnNext()/doOnTerminate() 实现订阅者。我也试过 doAfterTerminate()。我也试过显式定义一个订阅者,但 onNext() 和 onCompleted() 都没有被调用。
根据 ,reduce() 没有终止,所以我尝试添加 take(1),但没有成功。在同一个 Whosebug 问题中,有人说问题可能是我的流永远不会关闭。除了 take(1) 之外,也许还有其他方法我应该关闭流,但我对 ReactiveX 的理解还不够好。
根据 ,可能是系列中的原始流没有终止。但是我不明白为什么如果我调用 take(1) 会很重要,我认为它应该发出终止信号。
基本上,为什么不执行以下行?
System.out.println("doOnNext map.size()=" + map.size());
即使下面这行代码被执行了 98 次:
map.put(e.getKey(), e.getValue());
JsonObjectObservableRequest.java
import com.android.volley.toolbox.JsonObjectRequest;
...
public class JsonObjectObservableRequest {
public JsonObjectObservableRequest(int method, String url, JSONObject request) {
jsonObjectRequest = new JsonObjectRequest(method, url, request, getResponseListener(), getResponseErrorListener());
}
private Response.Listener<JSONObject> getResponseListener() {
return new Response.Listener<JSONObject>() {
@Override
public void onResponse(JSONObject response) {
publishSubject.onNext(Observable.just(response));
}
};
}
private Response.ErrorListener getResponseErrorListener() {
return new Response.ErrorListener() {
@Override
public void onErrorResponse(VolleyError error) {
Observable<JSONObject> myError = Observable.error(error);
publishSubject.onNext(myError);
}
};
}
public JsonObjectRequest getJsonObjectRequest() {
return jsonObjectRequest;
}
public Observable<JSONObject> getObservable() {
return publishSubject.flatMap(new Func1<Observable<JSONObject>, Observable< JSONObject>>() {
@Override
public Observable<JSONObject> call(Observable<JSONObject> jsonObjectObservable) {
return jsonObjectObservable;
}
});
}
}
JsonObjectObservableRequest调用代码
import com.android.volley.Request;
import com.android.volley.RequestQueue;
...
JsonObjectObservableRequest jsonObjectObservableRequest = new JsonObjectObservableRequest(Request.Method.GET, idURLString, null, keyId, key);
Observable<JSONObject> jsonObjectObservable = jsonObjectObservableRequest.getObservable();
jsonObjectObservable
.map(json -> {
try {
return NetworkAccountIdDatasource.parseIdJSON(json);
} catch (JSONException e) {
e.printStackTrace();
return null;
}
})
.flatMapIterable(x -> x)
.map(s -> new AbstractMap.SimpleEntry<String, String>("Name of " + s, "short id for " + s.substring(4)))
.reduce(new HashMap<String, String>(), (map, e) -> {
map.put(e.getKey(), e.getValue());
return map;
})
.take(1)
.doOnNext(map -> {
System.out.println("doOnNext map.size()=" + map.size());
})
.doOnTerminate(() -> {
System.out.println("doOnTerminate");
})
.subscribe();
final RequestQueue queue = Volley.newRequestQueue(context);
queue.add(jsonObjectObservableRequest.getJsonObjectRequest());
您正确识别了问题,您没有在源网络 Observable
上调用 onCompleted()
(在 JsonObjectObservableRequest
class),而 take(1)
也没有帮助,因为你把它放在 之前 reduce
.
正如您可能理解的那样,reduce 必须对 Observable
进行有限数量的排放操作,因为它会在所有项目都已排放后排放累积项目,因此它依赖于 onCompleted()
事件来了解观察到的时间流已结束。
- 我认为在你的情况下,最好的办法是改变
JsonObjectObservableRequest
的操作方式,为此你不需要 Subject
,你可以使用创建方法来包装回调(可以看我的回答here, and read more about bridging between callback world to RxJava here.
- 此外,您不需要发出
Observable
的东西,然后 flatmap
到项目,您可以简单地用 onNext()
发出项目并发出错误onError()
,实际上您是在隐藏错误并将它们转换为发射,这可能会使处理错误变得更加困难。
- 您应该在调用
onNext()
后在 onResponse()
调用 onCompleted()
以发出完成信号。如果出现错误,信号 onError
将通知流终止。
- 另一个问题是取消似乎没有以这种方式处理的请求,您可以阅读源代码以了解在包装 callbacl 调用时如何完成。
我找到了调用 doOnTerminate() 和 doOnNext() 的方法。它是将 take(1) 语句放在流处理中更高的位置。我是根据@yosriz 的评论得到这个想法的。我希望我明白为什么这能解决问题,但我不明白,因为我认为终止信号只会传播出去,但我想我还需要了解更多有关 ReactiveX / 反应式函数编程等的知识..
JsonObjectObservableRequest调用代码
import com.android.volley.Request;
import com.android.volley.RequestQueue;
...
JsonObjectObservableRequest jsonObjectObservableRequest = new JsonObjectObservableRequest(Request.Method.GET, idURLString, null, keyId, key);
Observable<JSONObject> jsonObjectObservable = jsonObjectObservableRequest.getObservable();
jsonObjectObservable
.map(json -> {
try {
return NetworkAccountIdDatasource.parseIdJSON(json);
} catch (JSONException e) {
e.printStackTrace();
return null;
}
})
.take(1) // SOLUTION: MOVED THIS UP FROM BELOW reduce()
.flatMapIterable(x -> x)
.map(s -> new AbstractMap.SimpleEntry<String, String>("Name of " + s, "short id for " + s.substring(4)))
.reduce(new HashMap<String, String>(), (map, e) -> {
map.put(e.getKey(), e.getValue());
return map;
})
// .take(1) // SOLUTION: MOVE THIS ABOVE flatMapIterable()
.doOnNext(map -> {
System.out.println("doOnNext map.size()=" + map.size());
})
.doOnTerminate(() -> {
System.out.println("doOnTerminate");
})
.subscribe();
final RequestQueue queue = Volley.newRequestQueue(context);
queue.add(jsonObjectObservableRequest.getJsonObjectRequest());
此外,如果我将 take(1) 作为第一个操作,在第一个 map() 操作(调用 parseIdJSON() 的操作)之前,它也会起作用。但是,如果我将 take(1) 作为操作放在 reduce() 之前,它只能部分起作用;好事是 doOnNext() 被调用但坏事是 map.size() 总是 1,而我的数据集应该是 98。
我的订户在下面没有调用 onNext() 和 onCompleted()。我已经尝试通过 doOnNext()/doOnTerminate() 实现订阅者。我也试过 doAfterTerminate()。我也试过显式定义一个订阅者,但 onNext() 和 onCompleted() 都没有被调用。
根据
根据
基本上,为什么不执行以下行?
System.out.println("doOnNext map.size()=" + map.size());
即使下面这行代码被执行了 98 次:
map.put(e.getKey(), e.getValue());
JsonObjectObservableRequest.java
import com.android.volley.toolbox.JsonObjectRequest;
...
public class JsonObjectObservableRequest {
public JsonObjectObservableRequest(int method, String url, JSONObject request) {
jsonObjectRequest = new JsonObjectRequest(method, url, request, getResponseListener(), getResponseErrorListener());
}
private Response.Listener<JSONObject> getResponseListener() {
return new Response.Listener<JSONObject>() {
@Override
public void onResponse(JSONObject response) {
publishSubject.onNext(Observable.just(response));
}
};
}
private Response.ErrorListener getResponseErrorListener() {
return new Response.ErrorListener() {
@Override
public void onErrorResponse(VolleyError error) {
Observable<JSONObject> myError = Observable.error(error);
publishSubject.onNext(myError);
}
};
}
public JsonObjectRequest getJsonObjectRequest() {
return jsonObjectRequest;
}
public Observable<JSONObject> getObservable() {
return publishSubject.flatMap(new Func1<Observable<JSONObject>, Observable< JSONObject>>() {
@Override
public Observable<JSONObject> call(Observable<JSONObject> jsonObjectObservable) {
return jsonObjectObservable;
}
});
}
}
JsonObjectObservableRequest调用代码
import com.android.volley.Request;
import com.android.volley.RequestQueue;
...
JsonObjectObservableRequest jsonObjectObservableRequest = new JsonObjectObservableRequest(Request.Method.GET, idURLString, null, keyId, key);
Observable<JSONObject> jsonObjectObservable = jsonObjectObservableRequest.getObservable();
jsonObjectObservable
.map(json -> {
try {
return NetworkAccountIdDatasource.parseIdJSON(json);
} catch (JSONException e) {
e.printStackTrace();
return null;
}
})
.flatMapIterable(x -> x)
.map(s -> new AbstractMap.SimpleEntry<String, String>("Name of " + s, "short id for " + s.substring(4)))
.reduce(new HashMap<String, String>(), (map, e) -> {
map.put(e.getKey(), e.getValue());
return map;
})
.take(1)
.doOnNext(map -> {
System.out.println("doOnNext map.size()=" + map.size());
})
.doOnTerminate(() -> {
System.out.println("doOnTerminate");
})
.subscribe();
final RequestQueue queue = Volley.newRequestQueue(context);
queue.add(jsonObjectObservableRequest.getJsonObjectRequest());
您正确识别了问题,您没有在源网络 Observable
上调用 onCompleted()
(在 JsonObjectObservableRequest
class),而 take(1)
也没有帮助,因为你把它放在 之前 reduce
.
正如您可能理解的那样,reduce 必须对 Observable
进行有限数量的排放操作,因为它会在所有项目都已排放后排放累积项目,因此它依赖于 onCompleted()
事件来了解观察到的时间流已结束。
- 我认为在你的情况下,最好的办法是改变
JsonObjectObservableRequest
的操作方式,为此你不需要Subject
,你可以使用创建方法来包装回调(可以看我的回答here, and read more about bridging between callback world to RxJava here. - 此外,您不需要发出
Observable
的东西,然后flatmap
到项目,您可以简单地用onNext()
发出项目并发出错误onError()
,实际上您是在隐藏错误并将它们转换为发射,这可能会使处理错误变得更加困难。 - 您应该在调用
onNext()
后在onResponse()
调用onCompleted()
以发出完成信号。如果出现错误,信号onError
将通知流终止。 - 另一个问题是取消似乎没有以这种方式处理的请求,您可以阅读源代码以了解在包装 callbacl 调用时如何完成。
我找到了调用 doOnTerminate() 和 doOnNext() 的方法。它是将 take(1) 语句放在流处理中更高的位置。我是根据@yosriz 的评论得到这个想法的。我希望我明白为什么这能解决问题,但我不明白,因为我认为终止信号只会传播出去,但我想我还需要了解更多有关 ReactiveX / 反应式函数编程等的知识..
JsonObjectObservableRequest调用代码
import com.android.volley.Request;
import com.android.volley.RequestQueue;
...
JsonObjectObservableRequest jsonObjectObservableRequest = new JsonObjectObservableRequest(Request.Method.GET, idURLString, null, keyId, key);
Observable<JSONObject> jsonObjectObservable = jsonObjectObservableRequest.getObservable();
jsonObjectObservable
.map(json -> {
try {
return NetworkAccountIdDatasource.parseIdJSON(json);
} catch (JSONException e) {
e.printStackTrace();
return null;
}
})
.take(1) // SOLUTION: MOVED THIS UP FROM BELOW reduce()
.flatMapIterable(x -> x)
.map(s -> new AbstractMap.SimpleEntry<String, String>("Name of " + s, "short id for " + s.substring(4)))
.reduce(new HashMap<String, String>(), (map, e) -> {
map.put(e.getKey(), e.getValue());
return map;
})
// .take(1) // SOLUTION: MOVE THIS ABOVE flatMapIterable()
.doOnNext(map -> {
System.out.println("doOnNext map.size()=" + map.size());
})
.doOnTerminate(() -> {
System.out.println("doOnTerminate");
})
.subscribe();
final RequestQueue queue = Volley.newRequestQueue(context);
queue.add(jsonObjectObservableRequest.getJsonObjectRequest());
此外,如果我将 take(1) 作为第一个操作,在第一个 map() 操作(调用 parseIdJSON() 的操作)之前,它也会起作用。但是,如果我将 take(1) 作为操作放在 reduce() 之前,它只能部分起作用;好事是 doOnNext() 被调用但坏事是 map.size() 总是 1,而我的数据集应该是 98。