每个订阅者可以从一个 Flowable 获得不同的值吗?
Can each subscriber get different values from one Flowable?
当一些订阅者订阅了一个flowable,这个flowable向订阅者发送items时,每个订阅者可以得到不同的值吗?
例如,如果 flowable 发出
0、1、2、3...
一个观察者获得 0、2,另一个观察者获得 1、3,依此类推,就像负载均衡器一样。
这个解决方案怎么样?
@Test
public void testFlowableLoadBalancer() {
IntStream stream = IntStream.iterate(1, i -> i + 1);
Flowable<Integer> flowable = Flowable.create(e -> stream.forEach(i -> {
try {
Thread.sleep(1000);
} catch (InterruptedException ignored) {
}
e.onNext(i);
}), BackpressureStrategy.DROP);
ConnectableFlowable<Integer> cf = flowable.publish();
cf.filter(i -> i % 2 == 0).subscribe(i -> {
logger.info("[even] i = {}" , i);
});
cf.filter(i -> i % 2 == 1).subscribe(i -> {
logger.info("[ odd] i = {}" , i);
});
cf.connect();
}
输出:
2016-11-11 18:15:57,884 INFO data.Rx2Test - [ odd] i = 1
2016-11-11 18:15:58,892 INFO data.Rx2Test - [even] i = 2
2016-11-11 18:15:59,895 INFO data.Rx2Test - [ odd] i = 3
2016-11-11 18:16:00,900 INFO data.Rx2Test - [even] i = 4
问题未解决:无法自动平衡,等待其他答案。
ParallelFlowable 更接近我想做的事情。
从 2.0.5 开始可用。
当一些订阅者订阅了一个flowable,这个flowable向订阅者发送items时,每个订阅者可以得到不同的值吗?
例如,如果 flowable 发出
0、1、2、3...
一个观察者获得 0、2,另一个观察者获得 1、3,依此类推,就像负载均衡器一样。
这个解决方案怎么样?
@Test
public void testFlowableLoadBalancer() {
IntStream stream = IntStream.iterate(1, i -> i + 1);
Flowable<Integer> flowable = Flowable.create(e -> stream.forEach(i -> {
try {
Thread.sleep(1000);
} catch (InterruptedException ignored) {
}
e.onNext(i);
}), BackpressureStrategy.DROP);
ConnectableFlowable<Integer> cf = flowable.publish();
cf.filter(i -> i % 2 == 0).subscribe(i -> {
logger.info("[even] i = {}" , i);
});
cf.filter(i -> i % 2 == 1).subscribe(i -> {
logger.info("[ odd] i = {}" , i);
});
cf.connect();
}
输出:
2016-11-11 18:15:57,884 INFO data.Rx2Test - [ odd] i = 1
2016-11-11 18:15:58,892 INFO data.Rx2Test - [even] i = 2
2016-11-11 18:15:59,895 INFO data.Rx2Test - [ odd] i = 3
2016-11-11 18:16:00,900 INFO data.Rx2Test - [even] i = 4
问题未解决:无法自动平衡,等待其他答案。
ParallelFlowable 更接近我想做的事情。 从 2.0.5 开始可用。