Java Reactor onComplete 不一致

Java Reactor onComplete inconsistency

我确定我只是遗漏了一些东西。我是运行以下代码:

@Test
public void simpleCreation() throws Exception {
    Iterator<String> data = ImmutableList.of("1", "2", "3").iterator();
    Flux<String> stringFlux = Flux.create(emmiter -> {
        while ( data.hasNext() ) {
            emmiter.next(data.next());
        }
        emmiter.complete();
    });
    ConnectableFlux<String> connectableFlux = stringFlux.publish();

    connectableFlux.doOnComplete(() -> System.out.println("connectableFlux.doOnComplete"));
    stringFlux.doOnComplete(() -> System.out.println("stringFlux.doOnComplete"));

    CountDownLatch completeLatch = new CountDownLatch(1);
    Disposable disposable = connectableFlux.subscribe(s -> {
        System.out.println("subscribe: data: " + s);
    }, error -> { }, completeLatch::countDown);

    connectableFlux.connect();

    completeLatch.await();
    disposable.dispose();
}

并期望它打印 "connectableFlux.doOnComplete" 或 "stringFlux.doOnComplete" 或两者,但我都没有看到。订阅方法的 OnComplete 回调执行没有问题,但这些方法都没有调用,我不太明白为什么。

对我来说,它看起来有点不一致 - 在一个地方调用了回调,而其他地方则被忽略了。我可以用 doOnNext 观察到类似的行为。

如果有人能解释一下这背后的概念,我将不胜感激。我确定这不是错误,只是我缺少关于框架或一般概念的东西。

这一行导致了问题:

connectableFlux.doOnComplete(() -> System.out.println("connectableFlux.doOnComplete"));

调用doOnComplete()的结果被忽略。方法 returns 您要在其上调用 subscribe() 的 Flux 实例的新版本,它不会将逻辑添加到旧的 connectableFlux 实例。

像这样尝试:

Iterator<String> data = ImmutableList.of("1", "2", "3").iterator();
Flux<String> stringFlux = Flux.create(emmiter -> {
    while (data.hasNext()) {
        emmiter.next(data.next());
    }
    emmiter.complete();
});

stringFlux.doOnComplete(() -> System.out.println("stringFlux.doOnComplete()"))
        .subscribe(s -> System.out.println("subscribe: data: " + s), error -> {})
        .dispose();

stringFlux.publish().connect();

我无法将此添加为评论,很抱歉碰到一个老问题。只是想分享一下 Reactor 官方指南:

B.2. I used an operator on my Flux but it doesn’t seem to apply. What gives? Make sure that the variable you .subscribe() to has been affected by the operators you think should have been applied to it.

Reactor operators are decorators. They return a different instance that wraps the source sequence and add behavior. That is why the preferred way of using operators is to chain the calls.

Compare the following two examples:

without chaining (incorrect)

Flux<String> flux = Flux.just("foo", "chain");
flux.map(secret -> secret.replaceAll(".", "*")); 
flux.subscribe(next -> System.out.println("Received: " + next));
The mistake is here. The result isn’t attached to the flux variable.

without chaining (correct)

Flux<String> flux = Flux.just("foo", "chain");
flux = flux.map(secret -> secret.replaceAll(".", "*"));
flux.subscribe(next -> System.out.println("Received: " + next));

This sample is even better (because it’s simpler):

with chaining (best)

Flux<String> secrets = Flux
  .just("foo", "chain")
  .map(secret -> secret.replaceAll(".", "*"))
  .subscribe(next -> System.out.println("Received: " + next));

The first version will output:

Received: foo
Received: chain

Whereas the two other versions will output the expected:

Received: ***
Received: *****

https://projectreactor.io/docs/core/release/reference/#faq.chain