新线程上的通量间隔滴答
Flux interval ticks on new threads
我想使用 Flux 进行连续 运行 作业,它调用服务并处理其响应。处理速度可能比服务调用的频率慢。我有一个示例代码可以做到这一点,但它没有达到我想要实现的目的。
Flux.interval(Duration.ZERO, Duration.ofSeconds(1), Schedulers.boundedElastic())
.onBackpressureDrop()
.doOnNext(counter -> something())
.onErrorContinue(...)
.doOnComplete(...)
.subscribe();
所以问题是如果滴答每秒发生一次,但是 something() 需要 5 秒才能完成,那么每 5 秒调用 something() 而不是每 1 秒调用一次。如何修改每个 something() 获得自己的线程(来自有界线程池)的代码?我已经测试过每个订阅者都有一个专用线程,所以我可以增加对多个(固定)订阅者的处理,但我想让它更具动态性。
谢谢!
怎么样
Flux.interval(Duration.ZERO, Duration.ofSeconds(1), Schedulers.boundedElastic())
.onBackpressureDrop()
.flatMap(counter -> Mono.fromCallable(() -> {
something();
return counter; // simply to justify the callable, in case something has return type void.
}).subscribeOn(Schedulers.parallel()))
.onErrorContinue(...)
.doOnComplete(...)
.subscribe();
subscribeOn(Schedulers.parallel())
将在无界线程池上执行 Mono。它还将允许您通过更多操作进一步继续流。
我进行了更多调查,得出了以下代码:
Flux.interval(Duration.ZERO, Duration.ofSeconds(1), Schedulers.boundedElastic())
.onBackpressureDrop()
.parallel()
.runOn(Schedulers.boundedElastic())
.doOnNext(counter -> something())
.sequential()
.onErrorContinue(...)
.doOnComplete(...)
.subscribe();
好像它做了我想要的。
我想使用 Flux 进行连续 运行 作业,它调用服务并处理其响应。处理速度可能比服务调用的频率慢。我有一个示例代码可以做到这一点,但它没有达到我想要实现的目的。
Flux.interval(Duration.ZERO, Duration.ofSeconds(1), Schedulers.boundedElastic())
.onBackpressureDrop()
.doOnNext(counter -> something())
.onErrorContinue(...)
.doOnComplete(...)
.subscribe();
所以问题是如果滴答每秒发生一次,但是 something() 需要 5 秒才能完成,那么每 5 秒调用 something() 而不是每 1 秒调用一次。如何修改每个 something() 获得自己的线程(来自有界线程池)的代码?我已经测试过每个订阅者都有一个专用线程,所以我可以增加对多个(固定)订阅者的处理,但我想让它更具动态性。
谢谢!
怎么样
Flux.interval(Duration.ZERO, Duration.ofSeconds(1), Schedulers.boundedElastic())
.onBackpressureDrop()
.flatMap(counter -> Mono.fromCallable(() -> {
something();
return counter; // simply to justify the callable, in case something has return type void.
}).subscribeOn(Schedulers.parallel()))
.onErrorContinue(...)
.doOnComplete(...)
.subscribe();
subscribeOn(Schedulers.parallel())
将在无界线程池上执行 Mono。它还将允许您通过更多操作进一步继续流。
我进行了更多调查,得出了以下代码:
Flux.interval(Duration.ZERO, Duration.ofSeconds(1), Schedulers.boundedElastic())
.onBackpressureDrop()
.parallel()
.runOn(Schedulers.boundedElastic())
.doOnNext(counter -> something())
.sequential()
.onErrorContinue(...)
.doOnComplete(...)
.subscribe();
好像它做了我想要的。