Spring reactive 中的 MonoProcessor 是做什么用的?
What is MonoProcessor in Spring reactive used for?
在查看来自 Spring 框架的 WiretapConnector
的源代码时,我偶然发现了一个 MonoProcessor
类型的对象。我尝试使用谷歌搜索解释它的用途,但无济于事。
Javadoc 对 Reactive/Reactor 外行人说的不多:
A MonoProcessor
is a Mono extension that implements stateful
semantics. Multi-subscribe is allowed. Once a MonoProcessor
has been
resolved, newer subscribers will benefit from the cached result.
这最后一句暗示了计算的结果被缓存了,这似乎是MonoProcessor
在这个code中的使用。
有人可以阐明 MonoProcessor
的预期用例是什么以及为什么首先引入它吗?
用例是您想要一个处理器来创建 "hot" Mono,而不是 Flux。还提供取消、处理、onNext 等处理器功能。由于 Mono 是单个值,它反过来只能使用单个 onNext,因此结果会被缓存以供将来订阅。有效地将其从 "hot" 变为 "cold"。
热门单声道示例
//Set up flux processor, sink for thread safety
DirectProcessor<Integer> directProcessor = DirectProcessor.create();
FluxSink<Integer> sink = directProcessor.serialize().sink();
//Allows dynamic creation of Mono value, after initialisation
MonoProcessor<Integer> processor =
directProcessor.filter(s -> s > 5)
.next()
.toProcessor();
//Set up subscriptions, no values have been submitted to either yet
processor.map(i -> "monoProc: " + i).subscribe(System.out::println);
directProcessor.map(i -> "DirectProc: " + i).subscribe(System.out::println);
//Uncomment and above Mono subscription will never occur
//processor.cancel();
//Values from some other service or whatever
for (int i = 0; i < 10; i++) {
sink.next(i);
}
//Do something later with cached result
processor.map(i -> "monoProc cached: " + i).subscribe(System.out::println);
在查看来自 Spring 框架的 WiretapConnector
的源代码时,我偶然发现了一个 MonoProcessor
类型的对象。我尝试使用谷歌搜索解释它的用途,但无济于事。
Javadoc 对 Reactive/Reactor 外行人说的不多:
A
MonoProcessor
is a Mono extension that implements stateful semantics. Multi-subscribe is allowed. Once aMonoProcessor
has been resolved, newer subscribers will benefit from the cached result.
这最后一句暗示了计算的结果被缓存了,这似乎是MonoProcessor
在这个code中的使用。
有人可以阐明 MonoProcessor
的预期用例是什么以及为什么首先引入它吗?
用例是您想要一个处理器来创建 "hot" Mono,而不是 Flux。还提供取消、处理、onNext 等处理器功能。由于 Mono 是单个值,它反过来只能使用单个 onNext,因此结果会被缓存以供将来订阅。有效地将其从 "hot" 变为 "cold"。
热门单声道示例
//Set up flux processor, sink for thread safety
DirectProcessor<Integer> directProcessor = DirectProcessor.create();
FluxSink<Integer> sink = directProcessor.serialize().sink();
//Allows dynamic creation of Mono value, after initialisation
MonoProcessor<Integer> processor =
directProcessor.filter(s -> s > 5)
.next()
.toProcessor();
//Set up subscriptions, no values have been submitted to either yet
processor.map(i -> "monoProc: " + i).subscribe(System.out::println);
directProcessor.map(i -> "DirectProc: " + i).subscribe(System.out::println);
//Uncomment and above Mono subscription will never occur
//processor.cancel();
//Values from some other service or whatever
for (int i = 0; i < 10; i++) {
sink.next(i);
}
//Do something later with cached result
processor.map(i -> "monoProc cached: " + i).subscribe(System.out::println);