Rxjava:发生切换映射时发出减少值

Rxjava : Emit reduce value when switchmap happens

reduce 运算符在可观察对象的末尾(完成时)发出值。

我正在寻找一种在 switchmap 中使用 reduce 的方法。我想要当外部可观察值发出值或完成时无限内部可观察值的sum

@Test
public void emit_value_when_switchmap() throws InterruptedException {

    Observable.interval(0, 2000, TimeUnit.MILLISECONDS)
            .switchMapMaybe(
                    l -> Observable.interval(0, 500, TimeUnit.MILLISECONDS)
                            .reduce(Long::sum)
                            .map(a -> a + ": Final")
            )
            .subscribe(e -> System.out.println(e));


    Thread.sleep(10000);
}

此图说明了所需的行为:

//events: --------x-----1----2---1---x-----3--0--------x-1---1----|  
//result: ---------------------------4-----------------3----------2  

这可能不是最好的方法,但它暂时可以完成工作,直到有人想出更高级的方法来解决您的 use-case。

请看我的测试,我认为它解决了你的问题:

环境:(gradle -- groovy)

implementation "io.reactivex.rxjava2:rxjava:2.2.8"
testImplementation "org.junit.jupiter:junit-jupiter-api:5.7.0"
testImplementation "org.junit.jupiter:junit-jupiter-params:5.7.0"
testRuntimeOnly "org.junit.jupiter:junit-jupiter-engine:5.7.0"

测试 3 次发射是从可观察到的源发出的。每次发出新值时,都会订阅内部可观察对象。当发出新值时,内部可观察对象完成并将值推送到下游。然后新发出的值将通过订阅一个新的 inner-stream.

来处理
  @Test
  public void takeWhileReduce() {
    TestScheduler scheduler = new TestScheduler();
    PublishSubject<Integer> source = PublishSubject.create();

    Observable<Long> publish = source.publish(
        multicast -> {
          return multicast.flatMap(
              o -> {
                return Observable.interval(0, 500, TimeUnit.MILLISECONDS, scheduler) //
                    .takeUntil(multicast)
                    .reduce(Long::sum)
                    .toObservable();
              },
              1);
        });

    TestObserver<Long> test = publish.test();

    source.onNext(42);

    scheduler.advanceTimeBy(1500, TimeUnit.MILLISECONDS);

    // action - push next value - flatMapped value will complete and push value
    source.onNext(42);
    // assert - values emitted: 0,1,2,3
    test.assertValuesOnly(6L);

    // next value is flatMapped
    scheduler.advanceTimeBy(1000, TimeUnit.MILLISECONDS);
    // action - push next value - flatMapped value will complete and push value
    source.onNext(42);

    // assert - values emitted: 0,1,2
    test.assertValuesOnly(6L, 3L);

    scheduler.advanceTimeBy(500, TimeUnit.MILLISECONDS);

    // action - push next value - flatMapped value will complete and push value
    source.onNext(42);

    // assert - values emitted: 0,1
    test.assertValuesOnly(6L, 3L, 1L);
  }