Spring 5 执行fluxSink.next时FluxSink不发送数据

Spring 5 FluxSink does not send data when fluxSink.next is executed

我有一个生成 Randon INT 并发送回客户端的 GET 方法示例

@GetMapping
public Flux<String> search() {
    return Flux.create(fluxSink -> {

            Random r = new Random();
            int n;
            for (int i = 0; i < 10; i++) {
                n= r.nextInt(1000);
                System.out.println("Creating:"+n);
                fluxSink.next(String.valueOf(n));

                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            fluxSink.complete();


    });
}

卷曲:

curl -v -H "Accept: text/event-stream" -X GET 'http://localhost:8080/stream

使用此代码使用 CURL 命令时,我只能在 fluxSink.complete 发生时在我的客户端中看到数字。

现在,如果我将代码更改为:

@GetMapping
public Flux<String> search() {
    return Flux.create(fluxSink -> {
        Thread t = new Thread(() -> {
            Random r = new Random();
            int n;
            for (int i = 0; i < 10; i++) {
                n= r.nextInt(1000);
                System.out.println("Creating:"+n);
                fluxSink.next(String.valueOf(n));

                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            fluxSink.complete();
        });
        t.start();
    });
}

将进程包装到一个线程中,它工作正常。当 fluxSink.next 发生时,我可以看到数据传输正常。

谁能解释一下这个效果?如何在不显式使用线程的情况下查看数据流动?

谢谢!

Flux.create() 大约是 "emitting multiple elements in a synchronous or asynchronous manner through the FluxSink API"。在您的情况下,您正在同步发射所有元素,并且您没有机会让 Reactor 工作,因为在完成所有操作之前您永远不会退出该方法。简而言之,您正在阻塞当前线程,这在 Reactor 中是禁止的。

使用线程让 Reactor 有机会分派这些信号。但是 Flux.create() 的精神(如其 javadoc 所示)是关于适应回调。

在这种情况下,也许 Flux.generate 更合适。尝试用 Thread 模拟延迟不是一个好主意。您可以改为执行以下操作:

Flux<String> response = Flux.just(aListOfElements).delayElements(Duration.ofSeconds(2));