从 Flux<String> 发射直到满足其中一个条件?

Emitting from Flux<String> until one of conditions are met?

我得到一个代表资源的 InputStream(很可能是一个 .txt 文件),我的任务是解析该文件的行。

我必须使我的程序尽可能可修改,这意味着今天我正在从文件中读取数据,但明天数据可能会通过 TCP 流从世界的另一端到达。不应假设新线路到达的速度。

考虑到这一点,我决定尝试 asynchronous/reactive 方法(如果有人可以提供一些很好的资源来解释差异,那就太好了)。

我在pom.xml中添加了:

    <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-webflux -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-webflux</artifactId>
        <version>2.4.4</version>
    </dependency>

另外,我必须 read/parse 文件直到处理完 100 行或者直到我开始处理后 30 秒。因此,满足哪个条件,处理就应该停止。

到目前为止,我有:

    String absolutePath = "C:\Users\User\Desktop\input.json"; 
    InputStream is = new FileInputStream(absolutePath);
    
    
    Flux<String> stringFlux = Flux.using(
            () -> new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8)).lines().limit(100),
            Flux::fromStream,
            Stream::close
    ).take(100);
    
    stringFlux
    .subscribe(s -> System.out.println(s + " " + new Date()), (error) -> System.out.println(error),  () -> System.out.println("complete Chanel"));

我特意留下了.limit(100).take(100),因为我不确定哪个更好(也许两者都错了?)。还有,我不知道怎么设置时间限制。

I have to read/parse the file until 100 lines have been processed

您在这里概述了两个明智的选择 - 要么限制基础流,要么限制通量本身。通常我提倡尽可能对源应用限制(在这种情况下是流),但是如果您希望能够轻松换出该源并保持限制不变,则应用 take(100) 在外部通量上可能是更好的选择。

这真的取决于你,这里没有关于什么更好的硬性规定 - 选择一个或另一个。

...or until 30 seconds have been passed since I started with processing.

There's another overload of take() that uses Duration。只需使用它,因此您的通量以 .take(100).take(Duration.ofSeconds(30)).

结尾

(与问题不完全相关,但作为一般说明 - Files.lines() 是一种更简洁的读取文件的方式,而不是现在使用 BufferedReader。)