Spring reactive 中的 MonoProcessor 是做什么用的?

What is MonoProcessor in Spring reactive used for?

在查看来自 Spring 框架的 WiretapConnector 的源代码时,我偶然发现了一个 MonoProcessor 类型的对象。我尝试使用谷歌搜索解释它的用途,但无济于事。

Javadoc 对 Reactive/Reactor 外行人说的不多:

A MonoProcessor is a Mono extension that implements stateful semantics. Multi-subscribe is allowed. Once a MonoProcessor has been resolved, newer subscribers will benefit from the cached result.

这最后一句暗示了计算的结果被缓存了,这似乎是MonoProcessor在这个code中的使用。

有人可以阐明 MonoProcessor 的预期用例是什么以及为什么首先引入它吗?

用例是您想要一个处理器来创建 "hot" Mono,而不是 Flux。还提供取消、处理、onNext 等处理器功能。由于 Mono 是单个值,它反过来只能使用单个 onNext,因此结果会被缓存以供将来订阅。有效地将其从 "hot" 变为 "cold"。

热门单声道示例

   //Set up flux processor, sink for thread safety
    DirectProcessor<Integer> directProcessor = DirectProcessor.create();
    FluxSink<Integer> sink = directProcessor.serialize().sink();

    //Allows dynamic creation of Mono value, after initialisation
    MonoProcessor<Integer> processor =
            directProcessor.filter(s -> s > 5)
            .next()
            .toProcessor();

     //Set up subscriptions, no values have been submitted to either yet
     processor.map(i -> "monoProc: " + i).subscribe(System.out::println);
     directProcessor.map(i -> "DirectProc: " + i).subscribe(System.out::println);

    //Uncomment and above Mono subscription will never occur
    //processor.cancel();

    //Values from some other service or whatever
    for (int i = 0; i < 10; i++) {
        sink.next(i);
    }

    //Do something later with cached result
    processor.map(i -> "monoProc cached: " + i).subscribe(System.out::println);