像 flatMap() 这样在同一个主线程上运行的 Reactor 运算符是否比常规阻塞代码更有效?
Are Reactor operators like flatMap() that runs on same main thread any more efficient than regular blocking code?
我知道很多反应堆操作员喜欢 flatMap() 运行 在调用 onNext 方法的同一线程上。我想了解的是,这种方法是否比 for 循环中的常规阻塞调用更 efficient/performant 非阻塞。对不起,如果这是一个菜鸟问题,但我似乎无法理解。 Reactive 的力量只有在我们使用跳线程的 Schedulers(例如 Schedulers.parallel()) 等等时才能实现吗?
例如如果我有如下功能
List<Integer> integers = Arrays.asList(5,6,7,8,9,10);
Flux.fromIterable(integers)
.flatMap(this::makeAReactiveMethodCall)
.subscribe(r -> log.info(Thread.currentThread().getName()));
日志看起来像这样 - 注意所有线程都是相同的“主”线程。
01:25:40.641 INFO ReactiveTest - main
01:25:40.642 INFO ReactiveTest - main
01:25:40.642 INFO ReactiveTest - main
01:25:40.642 INFO ReactiveTest - main
01:25:40.642 INFO ReactiveTest - main
01:25:40.642 INFO ReactiveTest - main
所有调用都发生在同一个主线程上。这段代码比在具有阻塞语义的 for 循环中进行所有调用更有效吗?说出下面的代码?
integers.forEach(i -> this.makeAReactiveMethodCall(i).block());
这不是 makeReactiveMethodCall()
是否在做 CPU 绑定的工作,或者它是否根本不是真正的反应而是变相的阻塞调用。
当 makeReactiveMethodCall
引入一些延迟时效率更高,例如。通过以被动方式执行 I/O。
对于您可能要考虑的各种处理步骤,在组合和使用统一抽象方面也存在权衡。
但是如果您追求 CPU 绑定代码的纯吞吐量,那么一定要使用一个好的旧循环。
假设每个 makeAReactiveMethodCall
做一些 I/O 并且需要 1 秒才能完成。使用 flatMap
运算符,您的调用将异步进行。这意味着主线程将在不等待 I/O 操作完成(非阻塞)的情况下进行所有 6 个调用,而是处理一些其他工作,并在调用完成时收到通知。在 WebClient
和 Project Reactor 的情况下,这是通过对 queue/dispatch/process 事件使用 Netty 事件循环来实现的。
在传统的阻塞方式中(例如RestTemplate
),同步进行所有 6 个调用需要 6 秒。当然,您可以使用 ExecutorService
API 使其异步,但在那种情况下,您将需要 6 个线程,因为调用会阻塞。响应式模型的优点之一是线程数量有限,因此不会在创建多线程时浪费资源。
我知道很多反应堆操作员喜欢 flatMap() 运行 在调用 onNext 方法的同一线程上。我想了解的是,这种方法是否比 for 循环中的常规阻塞调用更 efficient/performant 非阻塞。对不起,如果这是一个菜鸟问题,但我似乎无法理解。 Reactive 的力量只有在我们使用跳线程的 Schedulers(例如 Schedulers.parallel()) 等等时才能实现吗?
例如如果我有如下功能
List<Integer> integers = Arrays.asList(5,6,7,8,9,10);
Flux.fromIterable(integers)
.flatMap(this::makeAReactiveMethodCall)
.subscribe(r -> log.info(Thread.currentThread().getName()));
日志看起来像这样 - 注意所有线程都是相同的“主”线程。
01:25:40.641 INFO ReactiveTest - main
01:25:40.642 INFO ReactiveTest - main
01:25:40.642 INFO ReactiveTest - main
01:25:40.642 INFO ReactiveTest - main
01:25:40.642 INFO ReactiveTest - main
01:25:40.642 INFO ReactiveTest - main
所有调用都发生在同一个主线程上。这段代码比在具有阻塞语义的 for 循环中进行所有调用更有效吗?说出下面的代码?
integers.forEach(i -> this.makeAReactiveMethodCall(i).block());
这不是 makeReactiveMethodCall()
是否在做 CPU 绑定的工作,或者它是否根本不是真正的反应而是变相的阻塞调用。
当 makeReactiveMethodCall
引入一些延迟时效率更高,例如。通过以被动方式执行 I/O。
对于您可能要考虑的各种处理步骤,在组合和使用统一抽象方面也存在权衡。
但是如果您追求 CPU 绑定代码的纯吞吐量,那么一定要使用一个好的旧循环。
假设每个 makeAReactiveMethodCall
做一些 I/O 并且需要 1 秒才能完成。使用 flatMap
运算符,您的调用将异步进行。这意味着主线程将在不等待 I/O 操作完成(非阻塞)的情况下进行所有 6 个调用,而是处理一些其他工作,并在调用完成时收到通知。在 WebClient
和 Project Reactor 的情况下,这是通过对 queue/dispatch/process 事件使用 Netty 事件循环来实现的。
在传统的阻塞方式中(例如RestTemplate
),同步进行所有 6 个调用需要 6 秒。当然,您可以使用 ExecutorService
API 使其异步,但在那种情况下,您将需要 6 个线程,因为调用会阻塞。响应式模型的优点之一是线程数量有限,因此不会在创建多线程时浪费资源。