我是否通过将可观察对象转换为阻塞可观察对象来滥用 rxJava?

Am I misusing rxJava by converting an observable into a blocking observable?

我的 API 对两个独立的服务进行了大约 100 次下游调用,成对进行。在我可以 return 我对客户的回复之前,需要汇总所有回复。我使用 hystrix-feign 进行 HTTP 调用。

我想出了我认为是一个优雅的解决方案,直到 rxJava docs 我发现了以下内容

BlockingObservable is a variety of Observable that provides blocking operators. It can be useful for testing and demo purposes, but is generally inappropriate for production applications (if you think you need to use a BlockingObservable this is usually a sign that you should rethink your design).

我的代码大致如下

List<Observable<C>> observables = new ArrayList<>();
for (RequestPair request : requests) {
    Observable<C> zipped = Observable.zip(
         feignClientA.sendRequest(request.A()),
         feignClientB.sendRequest(request.B()),
         (a, b) -> new C(a,b));
    observables.add(zipped);
}

Collection<D> apiResponse = = new ConcurrentLinkedQueue<>();

Observable
    .merge(observables)
    .toBlocking()
    .forEach(combinedResponse -> apiResponse.add(doSomeWork(combinedResponse)));

return apiResponse;

基于此设置的几个问题:

  1. 根据我的用例,toBlocking() 是否合理
  2. 在主线程到达 forEach() 之前不会进行实际的 HTTP 调用,我的理解是否正确
  3. 我看到 forEach() 块中的代码由不同的线程执行,但我无法验证 forEach() 块中是否可以有多个线程。是否并发执行?

更好的选择是 return Observable 由其他运算符使用,但您可能会避开阻塞代码(但是,它应该 运行 在后台线程上.)

public Observable<D> getAll(Iterable<RequestPair> requests) {
    return Observable.from(requests)
    .flatMap(request ->
        Observable.zip(
            feignClientA.sendRequest(request.A()),
            feignClientB.sendRequest(request.B()),
            (a, b) -> new C(a,b)
        )
    , 8)  // maximum concurrent HTTP requests
    .map(both -> doSomeWork(both));
}

// for legacy users of the API
public Collection<D> getAllBlocking(Iterable<RequestPair> requests) {
    return getAll(requests)
        .toList()
        .toBlocking()
        .first();
}

Am I correct in understanding that the actual HTTP calls do not get made until the main thread gets to the forEach()

是的,forEach 触发了整个操作序列。

I've seen that the code in the forEach() block is executed by different threads, but I was not able to verify if there can be more than one thread in the forEach() block. Is the execution there concurrent?

一次只允许一个线程执行 forEach 中的 lambda,但您确实可能会看到不同的线程进入那里。