Java Reactor:如何从标准输入生成 Flux?

Java Reactor: How to produce Flux from stdin?

我想从标准输入中异步读取用户生成的消息。 类似于:

Flux.from(stdinPublisher()) 
    .subscribe(msg -> System.out.println("Received: " + msg));

那么如何在这里实现这样的标准输入发布者呢?

很简单。抱歉打扰:)

import java.util.Scanner;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;

@Component
@Slf4j
public class StdinProducerExample implements ApplicationRunner {

  @Override
  public void run(ApplicationArguments args) throws Exception {
    Flux
        .create(sink -> {
          Scanner scanner = new Scanner(System.in);
          while (scanner.hasNext()) {
            sink.next(scanner.nextLine());
          }
        })
        .subscribeOn(Schedulers.newSingle("stdin publisher"))
        .subscribe(m -> log.info("User message: {}", m));
    log.info("Started listening stdin");
  }

}

另一种使用 Reactor 生成数据的方法是 Processors

FluxProcessor sinks safely gate multi-threaded producers and can be used by applications that generate data from multiple threads concurrently. For example, you can create a thread-safe serialized sink for UnicastProcessor. Multiple producer threads may concurrently generate data on the following serialized sink:

public class FluxProcessorSample {

  public static void main(String[] args) {    
    FluxProcessor<String, String> processor = UnicastProcessor.<String>create().serialize();
    FluxSink<String> sink = processor.sink(FluxSink.OverflowStrategy.BUFFER);

    // Print input to STDOUT
    Executors.newSingleThreadScheduledExecutor()
        .execute(() -> processor
            .publishOn(Schedulers.elastic())
            .map(str -> "1>> " + str)
            .subscribe(System.out::println));

    Scanner scanner = new Scanner(System.in);
    while (scanner.hasNext()) {
      sink.next(scanner.nextLine());
    }
  }
}

A UnicastProcessor 可以通过使用内部缓冲区来处理背压。权衡是它最多可以有一个 Subscriber。如果您在订阅者尚未请求数据时通过它推送任何数量的数据,它会缓冲所有数据。

其他FluxProcessor implementations是:

  • DirectProcessor - 可以将信号分派给零个或多个 Subscribers。它有不处理背压的限制

  • EmitterProcessor - 可以在尊重的同时发送给多个订阅者 每个订阅者的背压。当它没有订阅者时, 它仍然可以接受一些数据推送到可配置的 bufferSize.