Jersey RxJava 客户端和资源端点集合

Jersey RxJava client and collection of resource endpoints

我对 RxJava 非常陌生,我仍在尝试理解数据流,但我想知道这是否可行,然后再深入兔子洞。

目标: 解耦 API 资源并允许它们分布,并为客户端 (web/mobile) 提供灵活性以获取基于 href link 的数据仅提供给少数 'known' API 个端点。

我从一个 API 中得到了 JSON 个对象的集合,其中每个 json 都有一个 'href' 字段,为API 可以在其中检索数据的完整详细信息的资源。例如:

{
   {
      name: "assassins_creed",
      href: "games/assassins_creedId123"
   },
   {
      name: "bioshock",
      href: "games/bioshockId456"
   },
   {
      name: "clean_code",
      href: "books/clean_codeId789"
   },
   {
      name: "christmas",
      href: "events/xmas001"
   }
}

检索此集合后,我想使用该 href 值调用适当的资源并将结果存储在集合中,例如 JSON 数组。

我正在将 JSON 对象列表中的字符串映射到另一个列表中,然后我试图将它们添加到一个 Observable 中,以便我可以将每个资源异步检索到同一响应中。

我一直在使用 Jersey example 作为参考,我担心它无法采用动态 href links 和 return 我想要的响应。

到目前为止我停在了:

@Override
public List<JSONObject> getUsersDashboard(TokenModel token, String userId, @Suspended final AsyncResponse async) {
    List<JSONObject> list = //Database request to get items for userId
    List<String> hrefLinks = list.stream()
            .map(e -> (String) e.get("href"))
            .collect(Collectors.toList());

    final Queue<String> errors = new ConcurrentLinkedQueue<>();

    Observable.just(new JSONArray())
            .zipWith(dashboard(token.getAccessToken(),"plhUrl", errors), (array, objects) -> {
                array.add(objects);
                return array;
            })
            .subscribe(response -> {
                //errors?
                async.resume(response);
            }, async::resume);
}

private Observable<List<JSONObject>> dashboard(String access, String urlFragment, Queue<String> errors) {
     Client client = ClientBuilder.newClient();
     return RxObservable.from(client).target(urlFragment).request()
            .header("Authorization", access)
            .rx()
            .get(new GenericType<List<JSONObject>>(){})
            .onErrorReturn(throwable -> {
                errors.offer(throwable.getMessage());
                return Collections.emptyList();
            });
}

我犹豫是否要将其放入 for 循环中为每个 hrefLinks 条目创建 Observables。

如何将来自多个资源的响应转化为对客户端的单一连贯响应?

flatMap 运算符就是您要找的东西。您可以在流中发出 href links,以便在 flatMap 运算符中将每个 link 转换为一个新的可观察对象,并将其结果传播到原始流中。我认为这正是您所描述的行为。

编辑: Retrofit 库提供了很好的接口来从 links 创建可观察对象。要详细说明您的示例,您首先需要使用 map 运算符将集合对象转换为某种 Iterable 。一旦你有一个可交互的,使用 flatMap 创建一个可观察的项目,这些项目属于使用 Observable.from() 的 Iterable。这将发出 links 作为单独的发射。然后,您再次使用 flatMap 将每个 link 转换为可观察的网络请求,将发射转发回原始流。

getCollectionObjectObservable()
  .map(result -> iterable) // transform the object into Iterable( ie. List ) 
  .flatMap(iterable -> Observable.from(iterable))
  .flatMap(link -> createRequestObservable(link))
  .subscribe(result -> /* process the result of each item here */ )