Project Reactor 的 flatMap 中关于线程的混淆

Confusion about threads in Project Reactor's flatMap

我正在研究 Project Reactor 和反应式 MongoDB 存储库。我有以下代码:

@Builder
@FieldDefaults(level = AccessLevel.PRIVATE)
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
@ToString
@Document
public class Person {
    @Id
    Integer id;
    String name;
}
public interface ReactivePersonRepository extends ReactiveCrudRepository<Person, Integer> {
}

和主要 @SpringBootApplication class:

@SpringBootApplication
@EnableReactiveMongoRepositories
@RequiredArgsConstructor
public class ReactiveDatabaseApplication {

    private final ReactivePersonRepository reactivePersonRepository;

    public static void main(String[] args) {
        SpringApplication.run(ReactiveDatabaseApplication.class, args);
    }

    @PostConstruct
    public void postConstruct() {
        Scheduler single = Schedulers.newSingle("single-scheduler");
        IntStream.range(0, 10).forEach(i ->
                Flux.just(Person.builder()
                        .id(i)
                        .name("PersonName")
                        .build())
                        .flatMap(personToSave -> {
                            System.out.println(String.format(
                                    "Saving person from thread %s", Thread.currentThread().getName()));
                            return reactivePersonRepository.save(personToSave);
                        })
                        //.publishOn(single)
                        .flatMap(savedPerson -> {
                            System.out.println(String.format(
                                    "Finding person from thread %s", Thread.currentThread().getName()));
                            return reactivePersonRepository.findById(savedPerson.getId());
                        })
                        //.publishOn(single)
                        .flatMap(foundPerson -> {
                            System.out.println(String.format(
                                    "Deleting person from thread %s", Thread.currentThread().getName()));
                            return reactivePersonRepository.deleteById(foundPerson.getId());
                        })
                        //.publishOn(single)
                        .subscribeOn(single)
                        .subscribe(aVoid -> System.out.println(String.format(
                                "Subscription from thread %s", Thread.currentThread().getName()))));
    }
}

Flux::subscribeOn 方法描述说:

As such, placing this operator anywhere in the chain will also impact the execution * context of onNext/onError/onComplete signals from the beginning of the chain up to * the next occurrence of a {@link publishOn(Scheduler) publishOn}

这让我有点困惑,因为当我在处理链中没有指定任何 publishOn 时,线程名称的打印值为:

Saving person from thread single-scheduler-1 - as expected

Finding person from thread Thread-13

Finding person from thread Thread-6

Finding person from thread Thread-15

Deleting person from thread Thread-6

Deleting person from thread Thread-5

Deleting person from thread Thread-4

我不明白为什么。 subscribeOn 方法中指定的调度程序不应该用于每个 flatMap 执行吗?

当我取消注释 publishOn 行时,一切都由给定的单个调度程序执行,这是预期的。

谁能解释为什么 flatMap 操作不使用单个调度程序,而没有 publishOn

这个人为的例子可能会更清楚:

Scheduler single = Schedulers.newSingle("single-scheduler");
Flux.just("Bob")
        .flatMap(x -> {
            System.out.println(String.format(
                    "Saving person from thread %s", Thread.currentThread().getName()));
            return Mono.just(x).publishOn(Schedulers.elastic());
        })
        .flatMap(x -> {
            System.out.println(String.format(
                    "Finding person from thread %s", Thread.currentThread().getName()));
            return Mono.just(x).publishOn(Schedulers.elastic());
        })
        .flatMap(x -> {
            System.out.println(String.format(
                    "Deleting person from thread %s", Thread.currentThread().getName()));
            return Mono.just(x).publishOn(Schedulers.elastic());
        })
        .subscribeOn(single)
        .subscribe(aVoid -> System.out.println(String.format(
        "Subscription from thread %s", Thread.currentThread().getName())));

这将给出类似于:

Saving person from thread single-scheduler-1
Finding person from thread elastic-2
Deleting person from thread elastic-3
Subscription from thread elastic-4

或者,换句话说,您的反应式存储库没有在同一个调度程序上发布, 就是您看到您所执行的行为的原因。 "Up until the next occurrence of publishOn()" 并不意味着下一次 您的 代码调用 publishOn() - 它也可以在您的任何 flatMap() 调用中的任何发布者中,你将无法控制。