反应流 - 超时批处理
Reactive Streams - batching with timeout
我正在考虑用 io.projectreactor
替换看起来非常接近 ReactiveStreams 的本地日志处理库。 objective 是为了减少我们维护的代码,并利用社区添加的任何新功能(eyeing operator fusion)。
首先,我需要使用 stdio 并将多行日志条目合并到将沿着管道流动的文本 blob。在 Filebeat 文档的 multiline log entries 章节中详细解释了用例(除非我们希望它在进程中)。
到目前为止我的代码是:
BufferedReader input = new BufferedReader(new InputStreamReader(System.in));
Flux<String> lines = Flux.generate(sink -> rethrow(() -> { while (true) sink.next(input.readLine()); }));
Flux<String> logRecordsStr = lines.concatMap(new LogRecordJoiner());
Flux<LogRecord> logRecords = logRecordsStr.map(new LogRecordMapper());
logRecords.doOnEach(r -> System.out.printf("%s payload: %d chars\n", r.timestamp, r.payload.length()))
.subscribe();
这会处理检测到新日志头时的多行合并,但在现有库中,我们还会在超时后刷新累积的行(即,如果在 5 秒内未收到文本,则刷新记录).
在 Reactor 中建模的正确方法是什么?我需要编写自己的运算符,还是可以自定义任何现有运算符?
任何指向在 Project Reactor 或 RxJava 中实现此用例的相关示例和文档的指针将不胜感激。
buffer
运算符对我来说似乎是最合适和最简单的解决方案。
它有基于规模和时间的策略。
你有日志,所以我认为,你可以将行数解释为缓冲区大小。
此处示例 - 如何发出按 4 或 5 秒时间跨度分组的项目:
Observable<String> lineReader = Observable.<String>create(subscriber -> {
try {
BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
for (String line = br.readLine(); line != null; line = br.readLine()) {
subscriber.onNext(line);
}
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}).subscribeOn(Schedulers.newThread());
lineReader
.buffer(5, TimeUnit.SECONDS,4)
.filter(lines -> !lines.isEmpty())
.subscribe(System.out::println);
这取决于您如何识别每个缓冲区的开始和结束,因此以下 RxJava 2 代码旨在提示有关使用主源的值打开和关闭缓冲区的门:
TestScheduler scheduler = new TestScheduler();
PublishProcessor<String> pp = PublishProcessor.create();
Function<Flowable<String>, Flowable<List<String>>> f = o ->
o.buffer(o.filter(v -> v.contains("Start")),
v -> Flowable.merge(o.filter(w -> w.contains("End")),
Flowable.timer(5, TimeUnit.MINUTES, scheduler)));
pp.publish(f)
.subscribe(System.out::println);
pp.onNext("Start");
pp.onNext("A");
pp.onNext("B");
pp.onNext("End");
pp.onNext("Start");
pp.onNext("C");
scheduler.advanceTimeBy(5, TimeUnit.MINUTES);
pp.onNext("Start");
pp.onNext("D");
pp.onNext("End");
pp.onComplete();
打印:
[Start, A, B, End]
[Start, C]
[Start, D, End]
它的工作原理是通过 publish
共享源代码,这允许重复使用来自上游的相同值,而无需同时拥有多个源副本 运行。开场是通过检测线上的一个"Start"字符串来控制的。关闭由 "End" 字符串的检测或宽限期后触发的计时器控制。
编辑:
如果 "Start" 也是下一批的指标,您可以用 "Start" 替换 "End" 检查并更改缓冲区的内容,因为它将包含新的header 在前一个缓冲区中否则:
pp.publish(f)
.doOnNext(v -> {
int s = v.size();
if (s > 1 && v.get(s - 1).contains("Start")) {
v.remove(s - 1);
}
})
.subscribe(System.out::println);
我正在考虑用 io.projectreactor
替换看起来非常接近 ReactiveStreams 的本地日志处理库。 objective 是为了减少我们维护的代码,并利用社区添加的任何新功能(eyeing operator fusion)。
首先,我需要使用 stdio 并将多行日志条目合并到将沿着管道流动的文本 blob。在 Filebeat 文档的 multiline log entries 章节中详细解释了用例(除非我们希望它在进程中)。
到目前为止我的代码是:
BufferedReader input = new BufferedReader(new InputStreamReader(System.in));
Flux<String> lines = Flux.generate(sink -> rethrow(() -> { while (true) sink.next(input.readLine()); }));
Flux<String> logRecordsStr = lines.concatMap(new LogRecordJoiner());
Flux<LogRecord> logRecords = logRecordsStr.map(new LogRecordMapper());
logRecords.doOnEach(r -> System.out.printf("%s payload: %d chars\n", r.timestamp, r.payload.length()))
.subscribe();
这会处理检测到新日志头时的多行合并,但在现有库中,我们还会在超时后刷新累积的行(即,如果在 5 秒内未收到文本,则刷新记录).
在 Reactor 中建模的正确方法是什么?我需要编写自己的运算符,还是可以自定义任何现有运算符?
任何指向在 Project Reactor 或 RxJava 中实现此用例的相关示例和文档的指针将不胜感激。
buffer
运算符对我来说似乎是最合适和最简单的解决方案。
它有基于规模和时间的策略。 你有日志,所以我认为,你可以将行数解释为缓冲区大小。
此处示例 - 如何发出按 4 或 5 秒时间跨度分组的项目:
Observable<String> lineReader = Observable.<String>create(subscriber -> {
try {
BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
for (String line = br.readLine(); line != null; line = br.readLine()) {
subscriber.onNext(line);
}
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}).subscribeOn(Schedulers.newThread());
lineReader
.buffer(5, TimeUnit.SECONDS,4)
.filter(lines -> !lines.isEmpty())
.subscribe(System.out::println);
这取决于您如何识别每个缓冲区的开始和结束,因此以下 RxJava 2 代码旨在提示有关使用主源的值打开和关闭缓冲区的门:
TestScheduler scheduler = new TestScheduler();
PublishProcessor<String> pp = PublishProcessor.create();
Function<Flowable<String>, Flowable<List<String>>> f = o ->
o.buffer(o.filter(v -> v.contains("Start")),
v -> Flowable.merge(o.filter(w -> w.contains("End")),
Flowable.timer(5, TimeUnit.MINUTES, scheduler)));
pp.publish(f)
.subscribe(System.out::println);
pp.onNext("Start");
pp.onNext("A");
pp.onNext("B");
pp.onNext("End");
pp.onNext("Start");
pp.onNext("C");
scheduler.advanceTimeBy(5, TimeUnit.MINUTES);
pp.onNext("Start");
pp.onNext("D");
pp.onNext("End");
pp.onComplete();
打印:
[Start, A, B, End]
[Start, C]
[Start, D, End]
它的工作原理是通过 publish
共享源代码,这允许重复使用来自上游的相同值,而无需同时拥有多个源副本 运行。开场是通过检测线上的一个"Start"字符串来控制的。关闭由 "End" 字符串的检测或宽限期后触发的计时器控制。
编辑:
如果 "Start" 也是下一批的指标,您可以用 "Start" 替换 "End" 检查并更改缓冲区的内容,因为它将包含新的header 在前一个缓冲区中否则:
pp.publish(f)
.doOnNext(v -> {
int s = v.size();
if (s > 1 && v.get(s - 1).contains("Start")) {
v.remove(s - 1);
}
})
.subscribe(System.out::println);