Reactor 项目中的运算符 "publishOn()" 和线程亲和性

Operator "publishOn()" and thread affinity in Reactor Project

我无法从文档中清楚地了解 publishOn()(或 observeOn() 在 RxJava 的情况下)运算符在线程亲和性方面的工作原理。我以为这个运算符保证任何订阅者都将在同一个线程中处理,但下面的例子打破了我的理解:

Flux<String> started = ep.publishOn(scheduler);
Flux<String> afterStateUpdate = started.doOnNext(e -> {
    System.out.println("STATE UPDATE : " + Thread.currentThread().getId());
    state.add(e);
}).share();
afterStateUpdate.subscribe();
ep.onNext("1");
ep.onNext("2");
ep.onNext("3");

afterStateUpdate.subscribe(e -> {
    System.out.println("MAIN SUBSCRIBER : " + Thread.currentThread().getId());
});
ep.onNext("4");
ep.onNext("5");
ep.onNext("6");

结果我看到以下输出:

STATE UPDATE : 12
STATE UPDATE : 12
STATE UPDATE : 12
STATE UPDATE : 13
MAIN SUBSCRIBER : 13
STATE UPDATE : 13
MAIN SUBSCRIBER : 13
STATE UPDATE : 13
MAIN SUBSCRIBER : 13

这意味着 "state updater" 一直在线程 12 中工作,但是当第二个订阅者订阅时 "state updater" 开始在线程 13 中工作。

所以,问题是在这种情况下我如何保证订阅者的线程亲和性?

我发现只有一种方法可以 100% 确定一个订阅者使用同一个线程。它是创建一个像这样的单一调度器池:

Scheduler[] schedulers = new Schedulers[10];
for (int i = 0; i < 10; i++) {
  schedulers[i] = Schedulers.newSingle();
}

public Flux<T> wrapIntoSingleThread(Scheduler scheduler) {
  return this.flux.publishOn(scheduler);
}

然后为每个订阅者从这个池中取出一个调度器:wrapIntoSingleThread(schedulers[i++ % schedulers.length]).subscribe(...)

如果有人知道提供线程关联的其他方式 - 请纠正我。