Vertx RxJava 运算符迭代不过滤内容
Vertx RxJava Operator iteration not filtering the content
我正在从文件中读取内容,我用换行符将一些名称放入文件中,但是在通过 vertx 文件系统读取文件时,我无法提供所需的过滤器。每次打印文件的所有数据。
这是代码片段:-
vertx.fileSystem().open("data.txt", new OpenOptions(), handler -> {
final AsyncFile asyncFile = handler.result();
final Observable<Buffer> observable = asyncFile.toObservable();
observable.subscribe(item -> {
final String[] split = item.toString().split("\n\r");
List<String> list = Arrays.asList(split);
final Observable<String> stringObservable = Observable.fromIterable(list);
stringObservable
.filter(name -> name.toString().startsWith("R"))
.take(2)
.subscribe(str -> System.out.println(str), err -> System.out.println(err), () -> System.out.println("Inner loop completed"));
}, error -> System.out.println(error), () -> System.out.println("Completed !!!"));
});
发现上面的observable一次性拥有文件的所有数据后,我使用了内部observable
如果我拆分为“\n”而不是“\n\r”,这个代码片段实际上对我来说工作得很好。如果您在打印所有内容时遇到问题,我的猜测是整个文件中的第一个字符是“R”,然后文件中的任何地方实际上都没有“\n\r”。因此,当您尝试拆分时,您最终只会得到一个包含整个文件的大字符串。
就是说,您可以通过使用 rxOpen
而不是 open
、避免订阅的嵌套并简化将 Buffer
转换为 [= 的方式来显着简化此代码14=] 发出 Buffer
.
中的每一行
另请注意,我使用 RecordParser
使用“\n”字符作为分隔符将文件内容拆分为标记。直接使用 toObservable()
将 AsyncFile
转换为 Observable<Buffer>
可能会导致 Buffer
s 在一条线的中途切断,这会搞砸你的解析。
所以把它们放在一起:
vertx.fileSystem().rxOpen("data.txt", new OpenOptions())
.flatMapObservable(af -> RecordParser.newDelimited("\n", af).toObservable())
.map(Buffer::toString)
.filter(name -> name.startsWith("R"))
.take(2)
.subscribe(System.out::println, System.err::println, () -> System.out.println("Completed"));
你从 13 行变成了 6 行。
我正在从文件中读取内容,我用换行符将一些名称放入文件中,但是在通过 vertx 文件系统读取文件时,我无法提供所需的过滤器。每次打印文件的所有数据。
这是代码片段:-
vertx.fileSystem().open("data.txt", new OpenOptions(), handler -> {
final AsyncFile asyncFile = handler.result();
final Observable<Buffer> observable = asyncFile.toObservable();
observable.subscribe(item -> {
final String[] split = item.toString().split("\n\r");
List<String> list = Arrays.asList(split);
final Observable<String> stringObservable = Observable.fromIterable(list);
stringObservable
.filter(name -> name.toString().startsWith("R"))
.take(2)
.subscribe(str -> System.out.println(str), err -> System.out.println(err), () -> System.out.println("Inner loop completed"));
}, error -> System.out.println(error), () -> System.out.println("Completed !!!"));
});
发现上面的observable一次性拥有文件的所有数据后,我使用了内部observable
如果我拆分为“\n”而不是“\n\r”,这个代码片段实际上对我来说工作得很好。如果您在打印所有内容时遇到问题,我的猜测是整个文件中的第一个字符是“R”,然后文件中的任何地方实际上都没有“\n\r”。因此,当您尝试拆分时,您最终只会得到一个包含整个文件的大字符串。
就是说,您可以通过使用 rxOpen
而不是 open
、避免订阅的嵌套并简化将 Buffer
转换为 [= 的方式来显着简化此代码14=] 发出 Buffer
.
另请注意,我使用 RecordParser
使用“\n”字符作为分隔符将文件内容拆分为标记。直接使用 toObservable()
将 AsyncFile
转换为 Observable<Buffer>
可能会导致 Buffer
s 在一条线的中途切断,这会搞砸你的解析。
所以把它们放在一起:
vertx.fileSystem().rxOpen("data.txt", new OpenOptions())
.flatMapObservable(af -> RecordParser.newDelimited("\n", af).toObservable())
.map(Buffer::toString)
.filter(name -> name.startsWith("R"))
.take(2)
.subscribe(System.out::println, System.err::println, () -> System.out.println("Completed"));
你从 13 行变成了 6 行。