如何检测流并跟踪进度? (香草 Java8 或 cylcops-react 反应流)
How to instrument streams and track progress? (vanilla Java8 or cylcops-react reactive streams)
给定一些使用流处理大量项目的代码,检测日志记录和 performance/profiling 的各个步骤的最佳方法是什么?
实例:
ReactiveSeq.fromStream(pairs)
.filter(this::satisfiesThreshold)
.filter(this::satisfiesPersistConditions)
.map((pair) -> convertToResult(pair, jobId))
.flatMap(Option::toJavaStream)
.grouped(CHUNK_SIZE)
.forEach((chunk) ->
{
repository.save(chunk);
incrementAndReport();
});
reportProcessingTime();
记录进度很重要,因此我可以在更新用户界面的另一个线程中触发进度事件。
希望跟踪此流中过滤和映射步骤的性能特征,以了解可以在何处进行优化以加快速度。
我看到三个选项:
- 在每个函数中放入logging/profiling代码
- 在每个步骤周围使用
peek
,而不实际使用值
- 某种基于注释或 AOP 的解决方案(不知道是什么)
哪个最好?关于#3 会是什么样子的任何想法?还有其他解决方案吗?
这里有几个选项(如果我理解正确的话):-
我们可以使用 elapsed 运算符来跟踪元素发射之间经过的时间,例如
ReactiveSeq.fromStream(Stream.of(1,2))
.filter(this::include)
.elapsed()
.map(this::logAndUnwrap)
Long[] filterTimeTakenMillis = new Long[maxSize];
int filterIndex = 0;
private <T> T logAndUnwrap(Tuple2<T, Long> t) {
//capture the elapsed time (t.v2) and then unwrap the tuple
filterTimeTakenMillis[filterIndex++]=t.v2;
return t.v1;
}
这仅适用于 cyclops-react 流。
- 我们可以利用 FluentFunctions 中的 AOP-like 功能
例如
ReactiveSeq.fromStream(Stream.of(1,2))
.filter(this::include)
.elapsed()
.map(this::logAndUnwrap)
.map(FluentFunctions.of(this::convertToResult)
.around(a->{
SimpleTimer timer = new SimpleTimer();
String r = a.proceed();
mapTimeTakenNanos[mapIndex++]=timer.getElapsedNanos();
return r;
}));
这也适用于原版 Java 8 条流。
给定一些使用流处理大量项目的代码,检测日志记录和 performance/profiling 的各个步骤的最佳方法是什么?
实例:
ReactiveSeq.fromStream(pairs)
.filter(this::satisfiesThreshold)
.filter(this::satisfiesPersistConditions)
.map((pair) -> convertToResult(pair, jobId))
.flatMap(Option::toJavaStream)
.grouped(CHUNK_SIZE)
.forEach((chunk) ->
{
repository.save(chunk);
incrementAndReport();
});
reportProcessingTime();
记录进度很重要,因此我可以在更新用户界面的另一个线程中触发进度事件。
希望跟踪此流中过滤和映射步骤的性能特征,以了解可以在何处进行优化以加快速度。
我看到三个选项:
- 在每个函数中放入logging/profiling代码
- 在每个步骤周围使用
peek
,而不实际使用值 - 某种基于注释或 AOP 的解决方案(不知道是什么)
哪个最好?关于#3 会是什么样子的任何想法?还有其他解决方案吗?
这里有几个选项(如果我理解正确的话):-
我们可以使用 elapsed 运算符来跟踪元素发射之间经过的时间,例如
ReactiveSeq.fromStream(Stream.of(1,2)) .filter(this::include) .elapsed() .map(this::logAndUnwrap) Long[] filterTimeTakenMillis = new Long[maxSize]; int filterIndex = 0; private <T> T logAndUnwrap(Tuple2<T, Long> t) { //capture the elapsed time (t.v2) and then unwrap the tuple filterTimeTakenMillis[filterIndex++]=t.v2; return t.v1; }
这仅适用于 cyclops-react 流。
- 我们可以利用 FluentFunctions 中的 AOP-like 功能
例如
ReactiveSeq.fromStream(Stream.of(1,2))
.filter(this::include)
.elapsed()
.map(this::logAndUnwrap)
.map(FluentFunctions.of(this::convertToResult)
.around(a->{
SimpleTimer timer = new SimpleTimer();
String r = a.proceed();
mapTimeTakenNanos[mapIndex++]=timer.getElapsedNanos();
return r;
}));
这也适用于原版 Java 8 条流。