新线程上的通量间隔滴答

Flux interval ticks on new threads

我想使用 Flux 进行连续 运行 作业,它调用服务并处理其响应。处理速度可能比服务调用的频率慢。我有一个示例代码可以做到这一点,但它没有达到我想要实现的目的。

Flux.interval(Duration.ZERO, Duration.ofSeconds(1), Schedulers.boundedElastic())
    .onBackpressureDrop()
    .doOnNext(counter -> something())
    .onErrorContinue(...)
    .doOnComplete(...)
    .subscribe();

所以问题是如果滴答每秒发生一次,但是 something() 需要 5 秒才能完成,那么每 5 秒调用 something() 而不是每 1 秒调用一次。如何修改每个 something() 获得自己的线程(来自有界线程池)的代码?我已经测试过每个订阅者都有一个专用线程,所以我可以增加对多个(固定)订阅者的处理,但我想让它更具动态性。

谢谢!

怎么样

        Flux.interval(Duration.ZERO, Duration.ofSeconds(1), Schedulers.boundedElastic())
                .onBackpressureDrop()
                .flatMap(counter -> Mono.fromCallable(() -> {
                    something();
                    return counter; // simply to justify the callable, in case something has return type void.
                }).subscribeOn(Schedulers.parallel()))
                .onErrorContinue(...)
                .doOnComplete(...)
                .subscribe();

subscribeOn(Schedulers.parallel()) 将在无界线程池上执行 Mono。它还将允许您通过更多操作进一步继续流。

我进行了更多调查,得出了以下代码:

Flux.interval(Duration.ZERO, Duration.ofSeconds(1), Schedulers.boundedElastic())
    .onBackpressureDrop()
    .parallel()
    .runOn(Schedulers.boundedElastic())
    .doOnNext(counter -> something())
    .sequential()
    .onErrorContinue(...)
    .doOnComplete(...)
    .subscribe();

好像它做了我想要的。