为什么 ConnectableFlux.connect() 阻塞?

Why does ConnectableFlux.connect() block?

我是 Spring Reactor 的新手。我一直在尝试了解 ConnectableFlux class 的工作原理。我已经阅读了文档并看到了在线发布的示例,但遇到了一个问题。

有人能告诉我为什么 connect() 方法会阻塞吗?我在文档中没有看到任何说明它应该阻塞的内容.. 特别是因为它 returns 是一次性的供以后使用。 鉴于我下面的示例代码,我永远不会通过 connect() 方法。

我试图基本上模拟我过去多次使用的旧式监听器界面范例。我想学习如何使用 Reactive 流重新创建 Service class & Listener 架构。我有一个简单的 Service class,它有一个名为“addUpdateListener(Listener l)”的方法,然后当我的服务 class "doStuff()" 方法触发一些事件传递给任何监听器。

我应该说我会写一个 API 供其他人使用,所以当我说服务 class 时,我并不是指 Spring 术语中的@Service。这将是一个普通的 java 单例 class.

我只是将 Spring Reactor 用于 Reactive Streams。我也在看 RxJava.. 但想看看 Spring Reactor Core 是否可以工作。

我从下面的测试 class 开始,只是为了了解库语法,然后卡在了阻塞问题上。

我想我要找的东西在这里有描述:Multiple Subscribers

UPDATE: 运行 我的代码通过调试器,ConnectableFlux connect 方法里面的代码,从来没有returns。它挂在内部连接方法上,永远不会 returns 来自该方法。

reactor.core.publisher.ConnectableFlux

public final Disposable connect() {
        Disposable[] out = new Disposable[]{null};
        this.connect((r) -> {
            out[0] = r;
        });
        return out[0];
    }

任何帮助都会很棒!

这也是我的 maven pom

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>SpringReactorTest</artifactId>
    <version>1.0-SNAPSHOT</version>
    <properties>
        <maven.compiler.target>1.8</maven.compiler.target>
        <maven.compiler.source>1.8</maven.compiler.source>
    </properties>
    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>io.projectreactor</groupId>
                <artifactId>reactor-bom</artifactId>
                <version>Bismuth-RELEASE</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-core</artifactId>
        </dependency>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>1.2.3</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.2.2</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <artifactSet>
                                <excludes>
                                    <exclude>classworlds:classworlds</exclude>
                                    <exclude>junit:junit</exclude>
                                    <exclude>jmock:*</exclude>
                                    <exclude>*:xml-apis</exclude>
                                    <exclude>org.apache.maven:lib:tests</exclude>
                                    <exclude>log4j:log4j:jar:</exclude>
                                </excludes>
                            </artifactSet>
                            <minimizeJar>true</minimizeJar>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.ConnectableFlux;
import reactor.core.publisher.Flux;

import java.util.concurrent.TimeUnit;

import static java.time.Duration.ofSeconds;

/**
 * Testing ConnectableFlux
 */
public class Main {

    private final static Logger LOG = LoggerFactory.getLogger(Main.class);

    public static void main(String[] args) throws InterruptedException {
        Main m = new Main();

        // Get the connectable
        ConnectableFlux<Object> flux = m.fluxPrintTime();

        // Subscribe some listeners
        // Tried using a new thread for the subscribers, but the connect call still blocks
        LOG.info("Subscribing");
        Disposable disposable = flux.subscribe(e -> LOG.info("Fast 1 - {}", e));
        Disposable disposable2 = flux.subscribe(e -> LOG.info("Fast 2 - {}", e));

        LOG.info("Connecting...");
        Disposable connect = flux.connect();// WHY does this block??
        LOG.info("Connected..");

        // Sleep 5 seconds
        TimeUnit.SECONDS.sleep(5);

        // Cleanup - Remove listeners
        LOG.info("Disposing");
        connect.dispose();
        disposable.dispose();
        disposable2.dispose();
        LOG.info("Disposed called");
    }

    // Just create a test flux
    public ConnectableFlux<Object> fluxPrintTime() {
        return Flux.create(fluxSink -> {
            while (true) {
                fluxSink.next(System.currentTimeMillis());
            }
        }).doOnSubscribe(ignore -> LOG.info("Connecting to source"))
                .sample(ofSeconds(2))
                .publish();
    }
}

运行 上面的代码给出了以下输出..它只是以毫秒为单位打印时间,直到我 Ctrl-C 这个过程..

09:36:21.463 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
09:36:21.478 [main] INFO Main - Subscribing
09:36:21.481 [main] INFO Main - Connecting...
09:36:21.490 [main] INFO Main - Connecting to source
09:36:23.492 [parallel-1] INFO Main - Fast 1 - 1589808983492
09:36:23.493 [parallel-1] INFO Main - Fast 2 - 1589808983492
09:36:25.493 [parallel-1] INFO Main - Fast 1 - 1589808985493
09:36:25.493 [parallel-1] INFO Main - Fast 2 - 1589808985493
09:36:27.490 [parallel-1] INFO Main - Fast 1 - 1589808987490
09:36:27.490 [parallel-1] INFO Main - Fast 2 - 1589808987490
09:36:29.493 [parallel-1] INFO Main - Fast 1 - 1589808989493
...

我收到了 Spring Reactor 团队的答复,我只是将其张贴在这里以防其他人遇到此问题...

The crux of the issue is that you're entering an infinite loop in Flux.create. The moment the flux gets subscribed, it will enter the loop and never exit it, producing data as fast as the CPU can. With Flux.create you should at least have a call to sink.complete() at some point.

I suggest to experiment with eg. Flux.interval as a source for your regular ticks, it will get rid of that extraneous complexity of Flux.create, which puts you in charge of lower level concepts of Reactive Streams (the onNext/onComplete/onError signals, that you'll need to learn about, but maybe not just right now ).

As a side note, I would take into consideration that emulating a listener-based API with Reactor (or RxJava) is not doing justice to what reactive programming can do. It is a constrained use case that will probably drive your focus and expectations away from the real benefits of reactive programming

From a higher perspective:

The broad idea of ConnectableFlux#connect() is that you have a "transient" source that you want to share between multiple subscribers, but it gets triggered the moment someone subscribes to it. So in order not to miss any event, you turn the source into a ConnectableFlux, perform some set up (subscribe several subscribers) and manually trigger the source (by calling connect()). It is not blocking, and returns a Disposable` that represents the upstream connection (in case you also want to manually cancel/dispose the whole subscription).

PS: Bismuth is now clearly outdated, prefer using the latest Dysprosium release train