了解缓冲区操作的工作原理
Understanding how buffer operation works
Flux.interval(Duration.ofMillis(100))
.map(i -> 0)
.buffer(Duration.ofMillis(200))
.filter(list1 -> list1.size() != 2)
.subscribe(System.out::println,
Throwable::printStackTrace, System.out::println);
以上代码打印:(至少在第一分钟):[0]
.
为什么?我根本没想到会有任何打印。
.interval
运算符只有一个保证,即在每个指定的时间间隔内,将安排生成 onNext
事件的任务。
遗憾的是,无法保证预定任务会在提交后立即执行。
这是因为底层 Scheduler
的未确定行为。
Flux.interval
换句话说就是安排一些事件并随时间重复它的运算符。这里的问题是,任何计划的操作都依赖于调用计划该操作的 Thread
。在 Reactor 的情况下,它是一组线程或换句话说 ThreadPool(在 Reactor 世界中它是 Scheduler
)。这里的问题是操作的执行可能会晚一点(但不会更早),不能保证这一点,因为在 Thread 的情况下,底层系统调度程序有可能将执行时间交给另一个 Thread 或另一个执行,所以专用 Thread
会饿死 CPU 个周期,或者在 ThreadPool 的情况下有一个 ThreadPool 的队列,反过来有可能会有另一个更重要的任务将更早执行。
这意味着间隔将晚。反过来,这意味着缓冲区中有3个任务的机会。 interval
保证的是间隔任务将在每个指定的时间间隔(在您的情况下为 200 毫秒)
Flux.interval(Duration.ofMillis(100))
.map(i -> 0)
.buffer(Duration.ofMillis(200))
.filter(list1 -> list1.size() != 2)
.subscribe(System.out::println,
Throwable::printStackTrace, System.out::println);
以上代码打印:(至少在第一分钟):[0]
.
为什么?我根本没想到会有任何打印。
.interval
运算符只有一个保证,即在每个指定的时间间隔内,将安排生成 onNext
事件的任务。
遗憾的是,无法保证预定任务会在提交后立即执行。
这是因为底层 Scheduler
的未确定行为。
Flux.interval
换句话说就是安排一些事件并随时间重复它的运算符。这里的问题是,任何计划的操作都依赖于调用计划该操作的 Thread
。在 Reactor 的情况下,它是一组线程或换句话说 ThreadPool(在 Reactor 世界中它是 Scheduler
)。这里的问题是操作的执行可能会晚一点(但不会更早),不能保证这一点,因为在 Thread 的情况下,底层系统调度程序有可能将执行时间交给另一个 Thread 或另一个执行,所以专用 Thread
会饿死 CPU 个周期,或者在 ThreadPool 的情况下有一个 ThreadPool 的队列,反过来有可能会有另一个更重要的任务将更早执行。
这意味着间隔将晚。反过来,这意味着缓冲区中有3个任务的机会。 interval
保证的是间隔任务将在每个指定的时间间隔(在您的情况下为 200 毫秒)