Vertx RxJava 运算符迭代不过滤内容

Vertx RxJava Operator iteration not filtering the content

我正在从文件中读取内容,我用换行符将一些名称放入文件中,但是在通过 vertx 文件系统读取文件时,我无法提供所需的过滤器。每次打印文件的所有数据。

这是代码片段:-

vertx.fileSystem().open("data.txt", new OpenOptions(), handler -> {
                final AsyncFile asyncFile = handler.result();
                final Observable<Buffer> observable = asyncFile.toObservable();
                observable.subscribe(item -> {
                    final String[] split = item.toString().split("\n\r");
                    List<String> list = Arrays.asList(split);
                    final Observable<String> stringObservable = Observable.fromIterable(list);
                    stringObservable
                            .filter(name -> name.toString().startsWith("R"))
                            .take(2)
                            .subscribe(str -> System.out.println(str), err -> System.out.println(err), () -> System.out.println("Inner loop completed"));


                }, error -> System.out.println(error), () -> System.out.println("Completed !!!"));
            });

发现上面的observable一次性拥有文件的所有数据后,我使用了内部observable

如果我拆分为“\n”而不是“\n\r”,这个代码片段实际上对我来说工作得很好。如果您在打印所有内容时遇到问题,我的猜测是整个文件中的第一个字符是“R”,然后文件中的任何地方实际上都没有“\n\r”。因此,当您尝试拆分时,您最终只会得到一个包含整个文件的大字符串。

就是说,您可以通过使用 rxOpen 而不是 open、避免订阅的嵌套并简化将 Buffer 转换为 [= 的方式来显着简化此代码14=] 发出 Buffer.

中的每一行

另请注意,我使用 RecordParser 使用“\n”字符作为分隔符将文件内容拆分为标记。直接使用 toObservable()AsyncFile 转换为 Observable<Buffer> 可能会导致 Buffers 在一条线的中途切断,这会搞砸你的解析。

所以把它们放在一起:

vertx.fileSystem().rxOpen("data.txt", new OpenOptions())
    .flatMapObservable(af -> RecordParser.newDelimited("\n", af).toObservable())
    .map(Buffer::toString)
    .filter(name -> name.startsWith("R"))
    .take(2)
    .subscribe(System.out::println, System.err::println, () -> System.out.println("Completed"));

你从 13 行变成了 6 行。