为通量消费者注册不同的线程
Reg different threads for flux consumer
在下面的两个示例中,处理通量流的行为似乎有所不同。
示例 1:
public static void main(String[] args) throws InterruptedException {
log.debug(" Before reading flux stream");
Flux<Customer> results = readFluxStream();
log.debug(Thread.currentThread().getName() + " : After reading flux stream");
results.subscribe(new LearnFlux().new CustomerConsumer());
Thread.sleep(5000);
log.debug(" Exit Main Thread ");
}
public static Flux<Customer> readFluxStream() {
List<Customer> customers = buildCustomers();
Customer[] customerArray = new Customer[customers.size()];
customerArray = customers.toArray(customerArray);
Flux<Customer> temp = Flux.fromArray(customerArray).delayElements(Duration.ofSeconds(1)).log();
return temp;
}
private class CustomerConsumer implements Consumer<Customer> {
@Override
public void accept(Customer customer) {
log.debug(Thread.currentThread().getName() + " This is a consumer " + customer.getFirstName());
}
}
从下面的日志中我们了解到 Flux 消费者 运行 在不同的线程中。(在 *** 中突出显示)。我在主线程中引入了睡眠,以便可以在控制台中捕获消费者日志。
19:07:24.695 [***main***] DEBUG com.learnjava.LearnFlux - Before reading flux stream
19:07:24.759 [***main***] DEBUG reactor.util.Loggers - Using Slf4j logging framework
19:07:24.779 [***main***] DEBUG com.learnjava.LearnFlux - main : After reading flux stream
19:07:24.788 [***main***] INFO reactor.Flux.ConcatMap.1 - onSubscribe(FluxConcatMap.ConcatMapImmediate)
19:07:24.790 [***main***] INFO reactor.Flux.ConcatMap.1 - request(unbounded)
19:07:25.821 [***parallel-1***] INFO reactor.Flux.ConcatMap.1 - onNext(Customer[id=null, firstName='Tom', lastName='Cruise'])
19:07:25.835 [***parallel-1***] DEBUG com.learnjava.LearnFlux - parallel-1 This is a consumer Tom
19:07:26.841 [***parallel-2***] INFO reactor.Flux.ConcatMap.1 - onNext(Customer[id=null, firstName='Jim', lastName='Carry'])
19:07:26.842 [***parallel-2***] DEBUG com.learnjava.LearnFlux - parallel-2 This is a consumer Jim
19:07:26.844 [***parallel-2***] INFO reactor.Flux.ConcatMap.1 - onComplete()
19:07:29.817 [***main***] DEBUG com.learnjava.LearnFlux - Exit Main Thread
示例 2
public interface CustomerRepository extends ReactiveCrudRepository<Customer, Long> {
@Query("SELECT * FROM customer WHERE last_name = :lastname")
Flux<Customer> findByLastName(String lastName);
}
public class CustomerConsumer implements Consumer<Customer> {
private static final Logger log = LoggerFactory.getLogger(CustomerConsumer.class);
@Override
public void accept(Customer customer) {
log.info(" This is a concusmer " + customer);
}
}
log.info(" Invoking R2DBC flux response ");
Flux<Customer> customers = repository.findAll();
customers.subscribe(new CustomerConsumer());
log.info("complete consumer in main thread");
从下面的日志中,我们观察到消费者 运行 在同一个主线程中。 (在 *** 中突出显示)
[***main***] Invoking R2DBC flux response
[***main***] This is a concusmer Customer[id=1, firstName='Jack', lastName='Bauer']
[***main***]This is a concusmer Customer[id=2, firstName='Chloe', lastName='O'Brian']
[***main***] This is a concusmer Customer[id=3, firstName='Michelle', lastName='Dessler']
[***main***] complete consumer in main thread
澄清 :
为什么第一个示例中的 Flux 消费者 运行 在不同的线程中,而基于 R2DBC 的存储库(第二个示例)返回的 Flux 在同一主线程中处理?
Why is the Flux consumer in the first example running in a different thread where as the Flux returned by the R2DBC based repository (second example) is processed in the same main thread?
这里的关键理解是,任何反应式运算符 都可以 切换线程(或更准确地说,调度程序),因为它认为合适。虽然大多数运算符不会切换,但基于时间的运算符必须切换,并且它们将默认使用并行调度程序。
在第一个示例中,您使用的是 delayElements()
运算符。由于它是一个基于时间的运算符,默认情况下,它会切换到并行调度程序,然后 运行s 在并行执行程序上(以及您在日志中看到的并行线程)。基于时间的调度程序必须切换为“即时”调度程序,它会让您的操作保持在同一个线程上,但不能进行基于时间的调度(这是 delayElements
运算符所要求的。)
这并不是说您 必须 使用并行调度程序(如果您有特殊原因不这样做)- 有一个重载可以让您将其设置为您喜欢的任何值。例如,如果您使用 .delayElements(Duration.ofSeconds(1), Schedulers.boundedElastic())
,您会看到您的日志将显示正在使用的有界弹性线程池。
相反,在您的第二个 R2DBC 示例中,没有操作员将其从即时调度程序中切换。正如您从日志中看到的那样,它只会在主线程上 运行。
如果您想更深入地了解线程在反应器中的工作原理,Simon 的 Flux 演讲非常值得一看:https://m.youtube.com/watch?v=sNgTTcG-fEU - 还有一些随附的博客文章。
在下面的两个示例中,处理通量流的行为似乎有所不同。
示例 1:
public static void main(String[] args) throws InterruptedException {
log.debug(" Before reading flux stream");
Flux<Customer> results = readFluxStream();
log.debug(Thread.currentThread().getName() + " : After reading flux stream");
results.subscribe(new LearnFlux().new CustomerConsumer());
Thread.sleep(5000);
log.debug(" Exit Main Thread ");
}
public static Flux<Customer> readFluxStream() {
List<Customer> customers = buildCustomers();
Customer[] customerArray = new Customer[customers.size()];
customerArray = customers.toArray(customerArray);
Flux<Customer> temp = Flux.fromArray(customerArray).delayElements(Duration.ofSeconds(1)).log();
return temp;
}
private class CustomerConsumer implements Consumer<Customer> {
@Override
public void accept(Customer customer) {
log.debug(Thread.currentThread().getName() + " This is a consumer " + customer.getFirstName());
}
}
从下面的日志中我们了解到 Flux 消费者 运行 在不同的线程中。(在 *** 中突出显示)。我在主线程中引入了睡眠,以便可以在控制台中捕获消费者日志。
19:07:24.695 [***main***] DEBUG com.learnjava.LearnFlux - Before reading flux stream
19:07:24.759 [***main***] DEBUG reactor.util.Loggers - Using Slf4j logging framework
19:07:24.779 [***main***] DEBUG com.learnjava.LearnFlux - main : After reading flux stream
19:07:24.788 [***main***] INFO reactor.Flux.ConcatMap.1 - onSubscribe(FluxConcatMap.ConcatMapImmediate)
19:07:24.790 [***main***] INFO reactor.Flux.ConcatMap.1 - request(unbounded)
19:07:25.821 [***parallel-1***] INFO reactor.Flux.ConcatMap.1 - onNext(Customer[id=null, firstName='Tom', lastName='Cruise'])
19:07:25.835 [***parallel-1***] DEBUG com.learnjava.LearnFlux - parallel-1 This is a consumer Tom
19:07:26.841 [***parallel-2***] INFO reactor.Flux.ConcatMap.1 - onNext(Customer[id=null, firstName='Jim', lastName='Carry'])
19:07:26.842 [***parallel-2***] DEBUG com.learnjava.LearnFlux - parallel-2 This is a consumer Jim
19:07:26.844 [***parallel-2***] INFO reactor.Flux.ConcatMap.1 - onComplete()
19:07:29.817 [***main***] DEBUG com.learnjava.LearnFlux - Exit Main Thread
示例 2
public interface CustomerRepository extends ReactiveCrudRepository<Customer, Long> {
@Query("SELECT * FROM customer WHERE last_name = :lastname")
Flux<Customer> findByLastName(String lastName);
}
public class CustomerConsumer implements Consumer<Customer> {
private static final Logger log = LoggerFactory.getLogger(CustomerConsumer.class);
@Override
public void accept(Customer customer) {
log.info(" This is a concusmer " + customer);
}
}
log.info(" Invoking R2DBC flux response ");
Flux<Customer> customers = repository.findAll();
customers.subscribe(new CustomerConsumer());
log.info("complete consumer in main thread");
从下面的日志中,我们观察到消费者 运行 在同一个主线程中。 (在 *** 中突出显示)
[***main***] Invoking R2DBC flux response
[***main***] This is a concusmer Customer[id=1, firstName='Jack', lastName='Bauer']
[***main***]This is a concusmer Customer[id=2, firstName='Chloe', lastName='O'Brian']
[***main***] This is a concusmer Customer[id=3, firstName='Michelle', lastName='Dessler']
[***main***] complete consumer in main thread
澄清 :
为什么第一个示例中的 Flux 消费者 运行 在不同的线程中,而基于 R2DBC 的存储库(第二个示例)返回的 Flux 在同一主线程中处理?
Why is the Flux consumer in the first example running in a different thread where as the Flux returned by the R2DBC based repository (second example) is processed in the same main thread?
这里的关键理解是,任何反应式运算符 都可以 切换线程(或更准确地说,调度程序),因为它认为合适。虽然大多数运算符不会切换,但基于时间的运算符必须切换,并且它们将默认使用并行调度程序。
在第一个示例中,您使用的是 delayElements()
运算符。由于它是一个基于时间的运算符,默认情况下,它会切换到并行调度程序,然后 运行s 在并行执行程序上(以及您在日志中看到的并行线程)。基于时间的调度程序必须切换为“即时”调度程序,它会让您的操作保持在同一个线程上,但不能进行基于时间的调度(这是 delayElements
运算符所要求的。)
这并不是说您 必须 使用并行调度程序(如果您有特殊原因不这样做)- 有一个重载可以让您将其设置为您喜欢的任何值。例如,如果您使用 .delayElements(Duration.ofSeconds(1), Schedulers.boundedElastic())
,您会看到您的日志将显示正在使用的有界弹性线程池。
相反,在您的第二个 R2DBC 示例中,没有操作员将其从即时调度程序中切换。正如您从日志中看到的那样,它只会在主线程上 运行。
如果您想更深入地了解线程在反应器中的工作原理,Simon 的 Flux 演讲非常值得一看:https://m.youtube.com/watch?v=sNgTTcG-fEU - 还有一些随附的博客文章。