Project Reactor 的 flatMap 中关于线程的混淆
Confusion about threads in Project Reactor's flatMap
我正在研究 Project Reactor 和反应式 MongoDB 存储库。我有以下代码:
@Builder
@FieldDefaults(level = AccessLevel.PRIVATE)
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
@ToString
@Document
public class Person {
@Id
Integer id;
String name;
}
public interface ReactivePersonRepository extends ReactiveCrudRepository<Person, Integer> {
}
和主要 @SpringBootApplication
class:
@SpringBootApplication
@EnableReactiveMongoRepositories
@RequiredArgsConstructor
public class ReactiveDatabaseApplication {
private final ReactivePersonRepository reactivePersonRepository;
public static void main(String[] args) {
SpringApplication.run(ReactiveDatabaseApplication.class, args);
}
@PostConstruct
public void postConstruct() {
Scheduler single = Schedulers.newSingle("single-scheduler");
IntStream.range(0, 10).forEach(i ->
Flux.just(Person.builder()
.id(i)
.name("PersonName")
.build())
.flatMap(personToSave -> {
System.out.println(String.format(
"Saving person from thread %s", Thread.currentThread().getName()));
return reactivePersonRepository.save(personToSave);
})
//.publishOn(single)
.flatMap(savedPerson -> {
System.out.println(String.format(
"Finding person from thread %s", Thread.currentThread().getName()));
return reactivePersonRepository.findById(savedPerson.getId());
})
//.publishOn(single)
.flatMap(foundPerson -> {
System.out.println(String.format(
"Deleting person from thread %s", Thread.currentThread().getName()));
return reactivePersonRepository.deleteById(foundPerson.getId());
})
//.publishOn(single)
.subscribeOn(single)
.subscribe(aVoid -> System.out.println(String.format(
"Subscription from thread %s", Thread.currentThread().getName()))));
}
}
Flux::subscribeOn
方法描述说:
As such, placing this operator anywhere in the chain will also impact
the execution * context of onNext/onError/onComplete signals from
the beginning of the chain up to * the next occurrence of a {@link
publishOn(Scheduler) publishOn}
这让我有点困惑,因为当我在处理链中没有指定任何 publishOn
时,线程名称的打印值为:
Saving person from thread single-scheduler-1 - as expected
Finding person from thread Thread-13
Finding person from thread Thread-6
Finding person from thread Thread-15
Deleting person from thread Thread-6
Deleting person from thread Thread-5
Deleting person from thread Thread-4
我不明白为什么。 subscribeOn
方法中指定的调度程序不应该用于每个 flatMap
执行吗?
当我取消注释 publishOn
行时,一切都由给定的单个调度程序执行,这是预期的。
谁能解释为什么 flatMap
操作不使用单个调度程序,而没有 publishOn
?
这个人为的例子可能会更清楚:
Scheduler single = Schedulers.newSingle("single-scheduler");
Flux.just("Bob")
.flatMap(x -> {
System.out.println(String.format(
"Saving person from thread %s", Thread.currentThread().getName()));
return Mono.just(x).publishOn(Schedulers.elastic());
})
.flatMap(x -> {
System.out.println(String.format(
"Finding person from thread %s", Thread.currentThread().getName()));
return Mono.just(x).publishOn(Schedulers.elastic());
})
.flatMap(x -> {
System.out.println(String.format(
"Deleting person from thread %s", Thread.currentThread().getName()));
return Mono.just(x).publishOn(Schedulers.elastic());
})
.subscribeOn(single)
.subscribe(aVoid -> System.out.println(String.format(
"Subscription from thread %s", Thread.currentThread().getName())));
这将给出类似于:
Saving person from thread single-scheduler-1
Finding person from thread elastic-2
Deleting person from thread elastic-3
Subscription from thread elastic-4
或者,换句话说,您的反应式存储库没有在同一个调度程序上发布,这 就是您看到您所执行的行为的原因。 "Up until the next occurrence of publishOn()
" 并不意味着下一次 您的 代码调用 publishOn()
- 它也可以在您的任何 flatMap()
调用中的任何发布者中,你将无法控制。
我正在研究 Project Reactor 和反应式 MongoDB 存储库。我有以下代码:
@Builder
@FieldDefaults(level = AccessLevel.PRIVATE)
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
@ToString
@Document
public class Person {
@Id
Integer id;
String name;
}
public interface ReactivePersonRepository extends ReactiveCrudRepository<Person, Integer> {
}
和主要 @SpringBootApplication
class:
@SpringBootApplication
@EnableReactiveMongoRepositories
@RequiredArgsConstructor
public class ReactiveDatabaseApplication {
private final ReactivePersonRepository reactivePersonRepository;
public static void main(String[] args) {
SpringApplication.run(ReactiveDatabaseApplication.class, args);
}
@PostConstruct
public void postConstruct() {
Scheduler single = Schedulers.newSingle("single-scheduler");
IntStream.range(0, 10).forEach(i ->
Flux.just(Person.builder()
.id(i)
.name("PersonName")
.build())
.flatMap(personToSave -> {
System.out.println(String.format(
"Saving person from thread %s", Thread.currentThread().getName()));
return reactivePersonRepository.save(personToSave);
})
//.publishOn(single)
.flatMap(savedPerson -> {
System.out.println(String.format(
"Finding person from thread %s", Thread.currentThread().getName()));
return reactivePersonRepository.findById(savedPerson.getId());
})
//.publishOn(single)
.flatMap(foundPerson -> {
System.out.println(String.format(
"Deleting person from thread %s", Thread.currentThread().getName()));
return reactivePersonRepository.deleteById(foundPerson.getId());
})
//.publishOn(single)
.subscribeOn(single)
.subscribe(aVoid -> System.out.println(String.format(
"Subscription from thread %s", Thread.currentThread().getName()))));
}
}
Flux::subscribeOn
方法描述说:
As such, placing this operator anywhere in the chain will also impact the execution * context of onNext/onError/onComplete signals from the beginning of the chain up to * the next occurrence of a {@link publishOn(Scheduler) publishOn}
这让我有点困惑,因为当我在处理链中没有指定任何 publishOn
时,线程名称的打印值为:
Saving person from thread single-scheduler-1 - as expected
Finding person from thread Thread-13
Finding person from thread Thread-6
Finding person from thread Thread-15
Deleting person from thread Thread-6
Deleting person from thread Thread-5
Deleting person from thread Thread-4
我不明白为什么。 subscribeOn
方法中指定的调度程序不应该用于每个 flatMap
执行吗?
当我取消注释 publishOn
行时,一切都由给定的单个调度程序执行,这是预期的。
谁能解释为什么 flatMap
操作不使用单个调度程序,而没有 publishOn
?
这个人为的例子可能会更清楚:
Scheduler single = Schedulers.newSingle("single-scheduler");
Flux.just("Bob")
.flatMap(x -> {
System.out.println(String.format(
"Saving person from thread %s", Thread.currentThread().getName()));
return Mono.just(x).publishOn(Schedulers.elastic());
})
.flatMap(x -> {
System.out.println(String.format(
"Finding person from thread %s", Thread.currentThread().getName()));
return Mono.just(x).publishOn(Schedulers.elastic());
})
.flatMap(x -> {
System.out.println(String.format(
"Deleting person from thread %s", Thread.currentThread().getName()));
return Mono.just(x).publishOn(Schedulers.elastic());
})
.subscribeOn(single)
.subscribe(aVoid -> System.out.println(String.format(
"Subscription from thread %s", Thread.currentThread().getName())));
这将给出类似于:
Saving person from thread single-scheduler-1
Finding person from thread elastic-2
Deleting person from thread elastic-3
Subscription from thread elastic-4
或者,换句话说,您的反应式存储库没有在同一个调度程序上发布,这 就是您看到您所执行的行为的原因。 "Up until the next occurrence of publishOn()
" 并不意味着下一次 您的 代码调用 publishOn()
- 它也可以在您的任何 flatMap()
调用中的任何发布者中,你将无法控制。