Java Reactor:如何从标准输入生成 Flux?
Java Reactor: How to produce Flux from stdin?
我想从标准输入中异步读取用户生成的消息。
类似于:
Flux.from(stdinPublisher())
.subscribe(msg -> System.out.println("Received: " + msg));
那么如何在这里实现这样的标准输入发布者呢?
很简单。抱歉打扰:)
import java.util.Scanner;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
@Component
@Slf4j
public class StdinProducerExample implements ApplicationRunner {
@Override
public void run(ApplicationArguments args) throws Exception {
Flux
.create(sink -> {
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()) {
sink.next(scanner.nextLine());
}
})
.subscribeOn(Schedulers.newSingle("stdin publisher"))
.subscribe(m -> log.info("User message: {}", m));
log.info("Started listening stdin");
}
}
另一种使用 Reactor 生成数据的方法是 Processors
。
FluxProcessor
sinks safely gate multi-threaded producers and can be
used by applications that generate data from multiple threads
concurrently. For example, you can create a thread-safe serialized
sink for UnicastProcessor
. Multiple producer threads may
concurrently generate data on the following serialized sink:
public class FluxProcessorSample {
public static void main(String[] args) {
FluxProcessor<String, String> processor = UnicastProcessor.<String>create().serialize();
FluxSink<String> sink = processor.sink(FluxSink.OverflowStrategy.BUFFER);
// Print input to STDOUT
Executors.newSingleThreadScheduledExecutor()
.execute(() -> processor
.publishOn(Schedulers.elastic())
.map(str -> "1>> " + str)
.subscribe(System.out::println));
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()) {
sink.next(scanner.nextLine());
}
}
}
A UnicastProcessor
可以通过使用内部缓冲区来处理背压。权衡是它最多可以有一个 Subscriber
。如果您在订阅者尚未请求数据时通过它推送任何数量的数据,它会缓冲所有数据。
其他FluxProcessor
implementations是:
DirectProcessor
- 可以将信号分派给零个或多个
Subscribers
。它有不处理背压的限制
EmitterProcessor
- 可以在尊重的同时发送给多个订阅者
每个订阅者的背压。当它没有订阅者时,
它仍然可以接受一些数据推送到可配置的
bufferSize
.
我想从标准输入中异步读取用户生成的消息。 类似于:
Flux.from(stdinPublisher())
.subscribe(msg -> System.out.println("Received: " + msg));
那么如何在这里实现这样的标准输入发布者呢?
很简单。抱歉打扰:)
import java.util.Scanner;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
@Component
@Slf4j
public class StdinProducerExample implements ApplicationRunner {
@Override
public void run(ApplicationArguments args) throws Exception {
Flux
.create(sink -> {
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()) {
sink.next(scanner.nextLine());
}
})
.subscribeOn(Schedulers.newSingle("stdin publisher"))
.subscribe(m -> log.info("User message: {}", m));
log.info("Started listening stdin");
}
}
另一种使用 Reactor 生成数据的方法是 Processors
。
FluxProcessor
sinks safely gate multi-threaded producers and can be used by applications that generate data from multiple threads concurrently. For example, you can create a thread-safe serialized sink forUnicastProcessor
. Multiple producer threads may concurrently generate data on the following serialized sink:
public class FluxProcessorSample {
public static void main(String[] args) {
FluxProcessor<String, String> processor = UnicastProcessor.<String>create().serialize();
FluxSink<String> sink = processor.sink(FluxSink.OverflowStrategy.BUFFER);
// Print input to STDOUT
Executors.newSingleThreadScheduledExecutor()
.execute(() -> processor
.publishOn(Schedulers.elastic())
.map(str -> "1>> " + str)
.subscribe(System.out::println));
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()) {
sink.next(scanner.nextLine());
}
}
}
A UnicastProcessor
可以通过使用内部缓冲区来处理背压。权衡是它最多可以有一个 Subscriber
。如果您在订阅者尚未请求数据时通过它推送任何数量的数据,它会缓冲所有数据。
其他FluxProcessor
implementations是:
DirectProcessor
- 可以将信号分派给零个或多个Subscribers
。它有不处理背压的限制EmitterProcessor
- 可以在尊重的同时发送给多个订阅者 每个订阅者的背压。当它没有订阅者时, 它仍然可以接受一些数据推送到可配置的bufferSize
.