Reactor 项目中的运算符 "publishOn()" 和线程亲和性
Operator "publishOn()" and thread affinity in Reactor Project
我无法从文档中清楚地了解 publishOn()
(或 observeOn()
在 RxJava 的情况下)运算符在线程亲和性方面的工作原理。我以为这个运算符保证任何订阅者都将在同一个线程中处理,但下面的例子打破了我的理解:
Flux<String> started = ep.publishOn(scheduler);
Flux<String> afterStateUpdate = started.doOnNext(e -> {
System.out.println("STATE UPDATE : " + Thread.currentThread().getId());
state.add(e);
}).share();
afterStateUpdate.subscribe();
ep.onNext("1");
ep.onNext("2");
ep.onNext("3");
afterStateUpdate.subscribe(e -> {
System.out.println("MAIN SUBSCRIBER : " + Thread.currentThread().getId());
});
ep.onNext("4");
ep.onNext("5");
ep.onNext("6");
结果我看到以下输出:
STATE UPDATE : 12
STATE UPDATE : 12
STATE UPDATE : 12
STATE UPDATE : 13
MAIN SUBSCRIBER : 13
STATE UPDATE : 13
MAIN SUBSCRIBER : 13
STATE UPDATE : 13
MAIN SUBSCRIBER : 13
这意味着 "state updater" 一直在线程 12 中工作,但是当第二个订阅者订阅时 "state updater" 开始在线程 13 中工作。
所以,问题是在这种情况下我如何保证订阅者的线程亲和性?
我发现只有一种方法可以 100% 确定一个订阅者使用同一个线程。它是创建一个像这样的单一调度器池:
Scheduler[] schedulers = new Schedulers[10];
for (int i = 0; i < 10; i++) {
schedulers[i] = Schedulers.newSingle();
}
public Flux<T> wrapIntoSingleThread(Scheduler scheduler) {
return this.flux.publishOn(scheduler);
}
然后为每个订阅者从这个池中取出一个调度器:wrapIntoSingleThread(schedulers[i++ % schedulers.length]).subscribe(...)
如果有人知道提供线程关联的其他方式 - 请纠正我。
我无法从文档中清楚地了解 publishOn()
(或 observeOn()
在 RxJava 的情况下)运算符在线程亲和性方面的工作原理。我以为这个运算符保证任何订阅者都将在同一个线程中处理,但下面的例子打破了我的理解:
Flux<String> started = ep.publishOn(scheduler);
Flux<String> afterStateUpdate = started.doOnNext(e -> {
System.out.println("STATE UPDATE : " + Thread.currentThread().getId());
state.add(e);
}).share();
afterStateUpdate.subscribe();
ep.onNext("1");
ep.onNext("2");
ep.onNext("3");
afterStateUpdate.subscribe(e -> {
System.out.println("MAIN SUBSCRIBER : " + Thread.currentThread().getId());
});
ep.onNext("4");
ep.onNext("5");
ep.onNext("6");
结果我看到以下输出:
STATE UPDATE : 12
STATE UPDATE : 12
STATE UPDATE : 12
STATE UPDATE : 13
MAIN SUBSCRIBER : 13
STATE UPDATE : 13
MAIN SUBSCRIBER : 13
STATE UPDATE : 13
MAIN SUBSCRIBER : 13
这意味着 "state updater" 一直在线程 12 中工作,但是当第二个订阅者订阅时 "state updater" 开始在线程 13 中工作。
所以,问题是在这种情况下我如何保证订阅者的线程亲和性?
我发现只有一种方法可以 100% 确定一个订阅者使用同一个线程。它是创建一个像这样的单一调度器池:
Scheduler[] schedulers = new Schedulers[10];
for (int i = 0; i < 10; i++) {
schedulers[i] = Schedulers.newSingle();
}
public Flux<T> wrapIntoSingleThread(Scheduler scheduler) {
return this.flux.publishOn(scheduler);
}
然后为每个订阅者从这个池中取出一个调度器:wrapIntoSingleThread(schedulers[i++ % schedulers.length]).subscribe(...)
如果有人知道提供线程关联的其他方式 - 请纠正我。