使用 Jersey RxJava 客户端未找到 class rx.Observable 的序列化程序
No serializer found for class rx.Observable using Jersey RxJava client
我正在创建一个 API 资源,它使用 Jersey RxJava 客户端将其他资源聚合到一个响应中。但是,我收到一个有点令人费解的错误返回。返回的对象是一个 JSONArray,但我得到这个:
No serializer found for class rx.Observable and no properties discovered to create BeanSerializer (to avoid exception, disable SerializationFeature.FAIL_ON_EMPTY_BEANS) ) (through reference chain: org.json.simple.JSONArray[0])
和方法:
@Override
public void getUsersDashboard(String token, String userId, @Suspended final AsyncResponse async) {
List<JSONObject> list = //query persistence layer
final Queue<String> errors = new ConcurrentLinkedQueue<>();
final List<Observable> observables = list.stream()
.map(jsonObject -> (String) jsonObject.get("href"))
.map(link -> dashboard(token, link, errors))
.collect(Collectors.toList());
Observable.just(new JSONArray())
.zipWith(observables, (jsonArray, resultFromObservable) -> {
jsonArray.add(resultFromObservable);
return jsonArray;
})
.subscribe(async::resume, async::resume);
}
知道是什么原因造成的吗?
因此,您的 observables
列表的类型为 List<Observable>
。由此我假设 dashboard
方法 returns 一个 Observable<?>
;进一步假设它 returns a Observable<T extends JSONNode>
,我将重写你的方法如下:
@Override
public void getUsersDashboard(String token, String userId, @Suspended final AsyncResponse async) {
List<JSONObject> list = //query persistence layer
final Queue<String> errors = new ConcurrentLinkedQueue<>();
Observable
.fromIterable(list)
.map(jsonObject -> (String) jsonObject.get("href"))
.flatMap(link -> dashboard(token, link, errors))
.collect(JSONArray::new, JSONArray::add)
.subscribe(async::resume, async::resume);
}
如果您使用以下行,zip 方法也可以工作:
.zipWith(Observable.merge(observables), (jsonArray, resultFromObservable) -> {
但是,zip()
从每个可观察对象中取出一个元素,这意味着您最终会得到一个大小为 1 的数组。
我正在创建一个 API 资源,它使用 Jersey RxJava 客户端将其他资源聚合到一个响应中。但是,我收到一个有点令人费解的错误返回。返回的对象是一个 JSONArray,但我得到这个:
No serializer found for class rx.Observable and no properties discovered to create BeanSerializer (to avoid exception, disable SerializationFeature.FAIL_ON_EMPTY_BEANS) ) (through reference chain: org.json.simple.JSONArray[0])
和方法:
@Override
public void getUsersDashboard(String token, String userId, @Suspended final AsyncResponse async) {
List<JSONObject> list = //query persistence layer
final Queue<String> errors = new ConcurrentLinkedQueue<>();
final List<Observable> observables = list.stream()
.map(jsonObject -> (String) jsonObject.get("href"))
.map(link -> dashboard(token, link, errors))
.collect(Collectors.toList());
Observable.just(new JSONArray())
.zipWith(observables, (jsonArray, resultFromObservable) -> {
jsonArray.add(resultFromObservable);
return jsonArray;
})
.subscribe(async::resume, async::resume);
}
知道是什么原因造成的吗?
因此,您的 observables
列表的类型为 List<Observable>
。由此我假设 dashboard
方法 returns 一个 Observable<?>
;进一步假设它 returns a Observable<T extends JSONNode>
,我将重写你的方法如下:
@Override
public void getUsersDashboard(String token, String userId, @Suspended final AsyncResponse async) {
List<JSONObject> list = //query persistence layer
final Queue<String> errors = new ConcurrentLinkedQueue<>();
Observable
.fromIterable(list)
.map(jsonObject -> (String) jsonObject.get("href"))
.flatMap(link -> dashboard(token, link, errors))
.collect(JSONArray::new, JSONArray::add)
.subscribe(async::resume, async::resume);
}
如果您使用以下行,zip 方法也可以工作:
.zipWith(Observable.merge(observables), (jsonArray, resultFromObservable) -> {
但是,zip()
从每个可观察对象中取出一个元素,这意味着您最终会得到一个大小为 1 的数组。