项目反应堆发布者初始化需要时间进行首次初始化
Project reactor publishers initialisation taking time for first time initialisation
我最近探索了项目反应器库,并尝试将它用于我的用例,其中我有一个任务列表,一些任务依赖于其他任务的执行,一些任务可以并行执行以提高性能.执行顺序以有向无环图的形式出现。下面是它的 POC 代码:
public class ReactorPOC {
public static void main(String args[]) {
//First time executing mono is taking long time
run();
//All subsequent executions not excess time
run();
run();
}
public static void run() {
try {
ExecutorService executorService = Executors.newFixedThreadPool(5);
Long st = System.currentTimeMillis();
Publisher one = getTask(60, "one", executorService, st).cache();
Publisher two = getTask(60, "two", executorService, st).cache();
Publisher three = getTask(60, "three", executorService, st).cache();
Publisher four = getTask(60, "four", executorService, st).cache();
Publisher eight = getTask(60, "eight", executorService, st).cache();
Publisher five = getTask(60, "five", executorService, st).cache();
Publisher six = getTask(60, "six", executorService, st).cache();
Publisher seven = getTask(60, "seven", executorService, st).cache();
three = Flux.concat(Flux.merge(one, two), three);
five = Flux.concat(Flux.merge(three, four, eight), five);
six = Flux.concat(five, six);
seven = Flux.concat(five, seven);
Flux last = Flux.merge(one, two, three, four, five, six, seven, eight);
last.blockLast();
System.out.println(System.currentTimeMillis() - st);
} catch (Exception e) {
System.out.println(e);
}
}
static Mono getTask(int sleep, String task, ExecutorService executorService, long st) {
return Mono.just(task).doOnSubscribe( i -> {
System.out.println("Starting " + task + " at " + (System.currentTimeMillis() - st));
try {
Thread.sleep(sleep);
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("Ending " + task + " at " + (System.currentTimeMillis() - st));
}).subscribeOn(Schedulers.fromExecutor(executorService));
}
}
这在执行顺序方面符合预期。但是我有两个疑问:
我已经执行了 3 次图形执行(从 main 函数调用 3 次)。第一次,它花费了大约 1200 毫秒,这太长了,对于所有下一次执行,它花费了大约 250 毫秒,这是它应该花费的预期时间。我正在尝试理解为什么第一次要花这么长时间。
如果有任何任务中断,我希望有一种方法可以抛出异常,而不是按执行顺序继续进行。有什么办法吗?我有一种方法可以想到我将在何处保留共享对象并设置一个错误字段,所有接下来的任务将首先查看该字段,然后决定不执行该任务。我想看看有没有更好的方法。
请帮助澄清以上两个问题。
另外,我对这个库和整个反应式范式都不熟悉。所以,如果上面的代码上有什么inputs/suggestions就好了
谢谢:)
据我从 运行 AsyncProfiler 和火焰图来看,这似乎纯粹是 class 加载的一个因素。
代码可以改进,特别是删除缓存并避免在 doOnSubscribe
内部阻塞(这是一种代码味道),但这并没有太大改变。
为了更好地演示,我将 run()
方法复制粘贴为 run2()
并让 main
执行 run1()
然后 run2()
(这是完全相同的代码)。然后我们可以观察到 run1()
总是需要更多的时间,不同之处在于 class loading:
如果我们先启动 run2()
,它会成为火焰图中突出的 class加载延迟:
我最近探索了项目反应器库,并尝试将它用于我的用例,其中我有一个任务列表,一些任务依赖于其他任务的执行,一些任务可以并行执行以提高性能.执行顺序以有向无环图的形式出现。下面是它的 POC 代码:
public class ReactorPOC {
public static void main(String args[]) {
//First time executing mono is taking long time
run();
//All subsequent executions not excess time
run();
run();
}
public static void run() {
try {
ExecutorService executorService = Executors.newFixedThreadPool(5);
Long st = System.currentTimeMillis();
Publisher one = getTask(60, "one", executorService, st).cache();
Publisher two = getTask(60, "two", executorService, st).cache();
Publisher three = getTask(60, "three", executorService, st).cache();
Publisher four = getTask(60, "four", executorService, st).cache();
Publisher eight = getTask(60, "eight", executorService, st).cache();
Publisher five = getTask(60, "five", executorService, st).cache();
Publisher six = getTask(60, "six", executorService, st).cache();
Publisher seven = getTask(60, "seven", executorService, st).cache();
three = Flux.concat(Flux.merge(one, two), three);
five = Flux.concat(Flux.merge(three, four, eight), five);
six = Flux.concat(five, six);
seven = Flux.concat(five, seven);
Flux last = Flux.merge(one, two, three, four, five, six, seven, eight);
last.blockLast();
System.out.println(System.currentTimeMillis() - st);
} catch (Exception e) {
System.out.println(e);
}
}
static Mono getTask(int sleep, String task, ExecutorService executorService, long st) {
return Mono.just(task).doOnSubscribe( i -> {
System.out.println("Starting " + task + " at " + (System.currentTimeMillis() - st));
try {
Thread.sleep(sleep);
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("Ending " + task + " at " + (System.currentTimeMillis() - st));
}).subscribeOn(Schedulers.fromExecutor(executorService));
}
}
这在执行顺序方面符合预期。但是我有两个疑问:
我已经执行了 3 次图形执行(从 main 函数调用 3 次)。第一次,它花费了大约 1200 毫秒,这太长了,对于所有下一次执行,它花费了大约 250 毫秒,这是它应该花费的预期时间。我正在尝试理解为什么第一次要花这么长时间。
如果有任何任务中断,我希望有一种方法可以抛出异常,而不是按执行顺序继续进行。有什么办法吗?我有一种方法可以想到我将在何处保留共享对象并设置一个错误字段,所有接下来的任务将首先查看该字段,然后决定不执行该任务。我想看看有没有更好的方法。
请帮助澄清以上两个问题。
另外,我对这个库和整个反应式范式都不熟悉。所以,如果上面的代码上有什么inputs/suggestions就好了
谢谢:)
据我从 运行 AsyncProfiler 和火焰图来看,这似乎纯粹是 class 加载的一个因素。
代码可以改进,特别是删除缓存并避免在 doOnSubscribe
内部阻塞(这是一种代码味道),但这并没有太大改变。
为了更好地演示,我将 run()
方法复制粘贴为 run2()
并让 main
执行 run1()
然后 run2()
(这是完全相同的代码)。然后我们可以观察到 run1()
总是需要更多的时间,不同之处在于 class loading:
如果我们先启动 run2()
,它会成为火焰图中突出的 class加载延迟: