从 Flux<String> 发射直到满足其中一个条件?
Emitting from Flux<String> until one of conditions are met?
我得到一个代表资源的 InputStream
(很可能是一个 .txt
文件),我的任务是解析该文件的行。
我必须使我的程序尽可能可修改,这意味着今天我正在从文件中读取数据,但明天数据可能会通过 TCP 流从世界的另一端到达。不应假设新线路到达的速度。
考虑到这一点,我决定尝试 asynchronous
/reactive
方法(如果有人可以提供一些很好的资源来解释差异,那就太好了)。
我在pom.xml
中添加了:
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-webflux -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
<version>2.4.4</version>
</dependency>
另外,我必须 read/parse 文件直到处理完 100 行或者直到我开始处理后 30 秒。因此,满足哪个条件,处理就应该停止。
到目前为止,我有:
String absolutePath = "C:\Users\User\Desktop\input.json";
InputStream is = new FileInputStream(absolutePath);
Flux<String> stringFlux = Flux.using(
() -> new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8)).lines().limit(100),
Flux::fromStream,
Stream::close
).take(100);
stringFlux
.subscribe(s -> System.out.println(s + " " + new Date()), (error) -> System.out.println(error), () -> System.out.println("complete Chanel"));
我特意留下了.limit(100)
和.take(100)
,因为我不确定哪个更好(也许两者都错了?)。还有,我不知道怎么设置时间限制。
I have to read/parse the file until 100 lines have been processed
您在这里概述了两个明智的选择 - 要么限制基础流,要么限制通量本身。通常我提倡尽可能对源应用限制(在这种情况下是流),但是如果您希望能够轻松换出该源并保持限制不变,则应用 take(100)
在外部通量上可能是更好的选择。
这真的取决于你,这里没有关于什么更好的硬性规定 - 选择一个或另一个。
...or until 30 seconds have been passed since I started with processing.
There's another overload of take() that uses Duration。只需使用它,因此您的通量以 .take(100).take(Duration.ofSeconds(30))
.
结尾
(与问题不完全相关,但作为一般说明 - Files.lines()
是一种更简洁的读取文件的方式,而不是现在使用 BufferedReader
。)
我得到一个代表资源的 InputStream
(很可能是一个 .txt
文件),我的任务是解析该文件的行。
我必须使我的程序尽可能可修改,这意味着今天我正在从文件中读取数据,但明天数据可能会通过 TCP 流从世界的另一端到达。不应假设新线路到达的速度。
考虑到这一点,我决定尝试 asynchronous
/reactive
方法(如果有人可以提供一些很好的资源来解释差异,那就太好了)。
我在pom.xml
中添加了:
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-webflux -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
<version>2.4.4</version>
</dependency>
另外,我必须 read/parse 文件直到处理完 100 行或者直到我开始处理后 30 秒。因此,满足哪个条件,处理就应该停止。
到目前为止,我有:
String absolutePath = "C:\Users\User\Desktop\input.json";
InputStream is = new FileInputStream(absolutePath);
Flux<String> stringFlux = Flux.using(
() -> new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8)).lines().limit(100),
Flux::fromStream,
Stream::close
).take(100);
stringFlux
.subscribe(s -> System.out.println(s + " " + new Date()), (error) -> System.out.println(error), () -> System.out.println("complete Chanel"));
我特意留下了.limit(100)
和.take(100)
,因为我不确定哪个更好(也许两者都错了?)。还有,我不知道怎么设置时间限制。
I have to read/parse the file until 100 lines have been processed
您在这里概述了两个明智的选择 - 要么限制基础流,要么限制通量本身。通常我提倡尽可能对源应用限制(在这种情况下是流),但是如果您希望能够轻松换出该源并保持限制不变,则应用 take(100)
在外部通量上可能是更好的选择。
这真的取决于你,这里没有关于什么更好的硬性规定 - 选择一个或另一个。
...or until 30 seconds have been passed since I started with processing.
There's another overload of take() that uses Duration。只需使用它,因此您的通量以 .take(100).take(Duration.ofSeconds(30))
.
(与问题不完全相关,但作为一般说明 - Files.lines()
是一种更简洁的读取文件的方式,而不是现在使用 BufferedReader
。)