rxjava 将 2 个调用与错误处理相结合,因延迟而失败

rxjava combines 2 calls with error handling, fails with delay

用例是, 有 2 个数据来源:

  1. 服务 1 - 从 source-1 获取
  2. 服务 2 - 从 source-2 获取

应用程序应该 return 至少来自 source-1 的数据。如果 source-2 一切正常 - 数据将被“增强”,比如乘以 100。

服务 1 调用服务 2。

如果所有成功的用户都从service-1和service-2获取数据 如果服务 2 出现错误,用户只能从服务 1 获取数据(至少) 如果服务 1 出现错误 - 用户将收到错误消息。

有模拟此场景的 hello-world-bench 代码:

import io.reactivex.rxjava3.core.Observable;
import java.util.concurrent.TimeUnit;

class Response {

    public Integer value;
    public String warning;
    public Response(Integer value) {
        this.value = value;
    }

    @Override
    public String toString() {
        return "Response{" +
                "value=" + value +
                ", warning='" + warning + '\'' +

                '}';
    }
}

class Service1 {

    public Observable<Response> call(int arg) {
        return Observable
                .just(
                        new Response(1),
                        new Response(2),
                        new Response(3),
                        new Response(4))
                .delay(100, TimeUnit.MILLISECONDS);
    }
}

class Service2 {

    public Observable<Response> call(int arg) {

        if ( arg % 2 == 0) {

            System.out.println("service 2: " + arg);

            return Observable
                    .just(new Response(100 * arg)) // service 2 multiplies x 100 on the result it gets from the service 1 
                    .delay(10, TimeUnit.MILLISECONDS);

        } else {

            System.out.println("service 2: " + arg);

            return Observable.error(new RuntimeException("service 2 error"));
        }
    }
}

public class Step1 {

    static Service1 service1 = new Service1();
    static Service2 service2 = new Service2();

    public static void main(String[] args) throws InterruptedException {

        var oo1 = service1.call(1);

        var oo3 = oo1.switchMapDelayError(x -> {

            final Observable<Response> oo2 = service2.call(x.value);

            return oo2
                    .onErrorReturn((ex) -> {
                        //System.out.println("Error handling..." + ex.getMessage() + " " + x);
                        x.warning = ex.getMessage();
                        return x; // returns at least service1 result
                    });
        });

        oo3.subscribe(x -> {
            System.out.println(x);
        });


        Thread.sleep(100000);
    }

}

这段代码的结果是:

service 2: 1
Response{value=1, warning='service 2 error'}
service 2: 2
service 2: 3
Response{value=3, warning='service 2 error'}
service 2: 4
Response{value=400, warning='null'}

问题是:没有预期的:value=2002*100

然而,如果我在 service2.call() //.delay(10, TimeUnit.MILLISECONDS) 评论延迟,那么它会得到预期的结果:

service 2: 1
Response{value=1, warning='service 2 error'}
service 2: 2
Response{value=200, warning='null'}
service 2: 3
Response{value=3, warning='service 2 error'}
service 2: 4
Response{value=400, warning='null'}

问题是:为什么 .delay(10, TimeUnit.MILLISECONDS) on service2.call() 无法产生 value=200 ?该解决方案有什么问题,我错过了什么?

谢谢。

您的问题是 switchMapDelayError 运算符。您应该使用 concatMap 或 flatMap

我冒昧地为您的用例编写了一个测试。请注意,始终使用重载来提供 Scheduler 以便提供 TestScheduler 进行测试。

switchMap 有什么作用?

在每个上游发出时,switchMap 订阅给定的内部流。当上游发出新值时,旧的内部流将被取消订阅,并再次调用 switchMap 的 lambda 以订阅新的内部流。

问题可能是这段代码:

return Observable
            .just(
                    new Response(1),
                    new Response(2),
                    new Response(3),
                    new Response(4))
            .delay(100, TimeUnit.MILLISECONDS);

它几乎立即一个接一个地在堆栈上发出响应 1 到 4,并且每个发出都在另一个线程上延迟。因此 Response 1 到 4 几乎会立即发出。它们不会像这样发出:Response(1) at 100ms, Response(2) at 200ms, etc.

让我们看看输出是什么

Observable.just(
    new Response(1), //
    new Response(2),
    new Response(3),
    new Response(4))
    .delay(100, TimeUnit.MILLISECONDS)
    .subscribe(r -> {
      System.out.println("received value at " + Schedulers.io().now(TimeUnit.MILLISECONDS));
    });

输出

received value at 1607432032768
received value at 1607432032769
received value at 1607432032769
received value at 1607432032769

因此所有值几乎立即发出并用 switchMap 相互覆盖。先前发出的值几乎立即被新值抵消。

解决方案

使用 concatMap 或 flatMap 或更改您的测试设置以 100 毫秒的间隔发出每个值。

flatMap 仅订阅每个值,默认情况下最多订阅 128 个内部流。当内部流完成时,ConcatMap 只会订阅下一个值。

测试

public class So65193002 {
      @Test
      void so() {
        TestScheduler testScheduler = new TestScheduler();
        Service1 service1 = new Service1(testScheduler);
        Service2 service2 = new Service2(testScheduler);
    
        Observable<Response> service1Call = service1.call(1);
    
        Observable<Response> combined =
            service1Call.concatMapEagerDelayError(
                x -> {
                  return service2
                      .call(x.value)
                      .onErrorReturn(
                          (ex) -> {
                            x.warning = ex.getMessage();
                            return x; // returns at least service1 result
                          });
                },
                true);
    
        TestObserver<Response> test = combined.test();
    
        testScheduler.advanceTimeBy(1, TimeUnit.HOURS);
    
        test.assertValueCount(4)
            .assertValueAt(
                0,
                r -> {
                  assertThat(r.value).isEqualTo(1);
                  assertThat(r.warning).isNotEmpty();
                  return true;
                })
            .assertValueAt(
                1,
                r -> {
                  assertThat(r.value).isEqualTo(200);
                  assertThat(r.warning).isNull();
                  return true;
                })
            .assertValueAt(
                3,
                r -> {
                  assertThat(r.value).isEqualTo(400);
                  assertThat(r.warning).isNull();
                  return true;
                });
      }
    }

class Response {
  public Integer value;
  public String warning;

  public Response(Integer value) {
    this.value = value;
  }

  @Override
  public String toString() {
    return "Response{" + "value=" + value + ", warning='" + warning + '\'' + '}';
  }
}

class Service1 {
  private final Scheduler scheduler;

  Service1(Scheduler scheduler) {
    this.scheduler = scheduler;
  }

  public Observable<Response> call(int arg) {
    return Observable.just(
            new Response(1), //
            new Response(2),
            new Response(3),
            new Response(4))
        .delay(100, TimeUnit.MILLISECONDS, scheduler);
  }
}

class Service2 {
  private final Scheduler scheduler;

  Service2(Scheduler scheduler) {
    this.scheduler = scheduler;
  }

  public Observable<Response> call(int arg) {
    if (arg % 2 == 0) {
      return Observable.just(new Response(100 * arg)).delay(10, TimeUnit.MILLISECONDS, scheduler);

    } else {
      return Observable.error(new RuntimeException("service 2 error"));
    }
  }
}

备注

不要使用可变对象。始终确保发出的值是不可变的,否则你会遇到麻烦。