Project Reactor:多个发布者进行 HTTP 调用,一个订阅者处理所有结果
Project Reactor: Multiple Publishers making HTTP calls and one Subscriber to handle all results
以下代码的问题是订阅者只能看到第一个流量的项目(即只打印 1
)。有趣的是,如果我添加 delayElements
,它工作正常。
这是一个玩具示例,但我打算将其替换为 Flux
发出 HTTP GET 请求并发出其结果(也可能超过两个)。
所以重新表述我的问题,我有一个需要实现的多对一关系。考虑到我的情况,如何实施?你会使用某种处理器吗?
public static void main(String[] args) throws Exception {
Flux<Integer> flux1 = Flux.generate(emitter -> {
emitter.next(1);
});
Flux<Integer> flux2 = Flux.generate(emitter -> {
emitter.next(2);
});
Flux<Integer> merged = flux1.mergeWith(flux2);
merged.subscribe(s -> System.out.println(s));
Thread.currentThread().join();
}
尝试使用 TopicProcessor 实现相同的想法,但遇到相同的问题:
public static void main(String[] args) throws Exception {
Flux<Integer> flux1 = Flux.generate(emitter -> {
emitter.next(1);
try {
Thread.sleep(100);
} catch (Exception e) {}
});
Flux<Integer> flux2 = Flux.generate(emitter -> {
emitter.next(2);
try {
Thread.sleep(100);
} catch (Exception e) {}
});
TopicProcessor<Integer> processor = TopicProcessor.create();
flux1.subscribe(processor);
flux2.subscribe(processor);
processor.subscribe(s -> System.out.println(s));
Thread.currentThread().join();
}
Note that merge is tailored to work with asynchronous sources or finite sources. When dealing with an infinite source that doesn't already publish on a dedicated Scheduler, you must isolate that source in its own Scheduler, as merge would otherwise attempt to drain it before subscribing to another source.
您在这里创建了一个没有专用调度程序的无限源,因此它试图在合并之前完全耗尽该源 - 这就是您遇到问题的原因。
这在您的 real-world 用例中可能不是问题,因为 GET
请求的结果大概不会是无限的。然而,如果你想确保结果是交错的,你只需要确保你设置每个通量都有自己的调度程序(通过在每个通量上调用 subscribeOn(Schedulers.elastic());
。)
所以你的例子变成了:
Flux<Integer> flux1 = Flux.<Integer>generate(emitter -> emitter.next(1))
.subscribeOn(Schedulers.elastic());
Flux<Integer> flux2 = Flux.<Integer>generate(emitter -> emitter.next(2))
.subscribeOn(Schedulers.elastic());
Flux<Integer> merged = flux1.mergeWith(flux2);
merged.subscribe(s -> System.out.println(s));
Thread.currentThread().join();
以下代码的问题是订阅者只能看到第一个流量的项目(即只打印 1
)。有趣的是,如果我添加 delayElements
,它工作正常。
这是一个玩具示例,但我打算将其替换为 Flux
发出 HTTP GET 请求并发出其结果(也可能超过两个)。
所以重新表述我的问题,我有一个需要实现的多对一关系。考虑到我的情况,如何实施?你会使用某种处理器吗?
public static void main(String[] args) throws Exception {
Flux<Integer> flux1 = Flux.generate(emitter -> {
emitter.next(1);
});
Flux<Integer> flux2 = Flux.generate(emitter -> {
emitter.next(2);
});
Flux<Integer> merged = flux1.mergeWith(flux2);
merged.subscribe(s -> System.out.println(s));
Thread.currentThread().join();
}
尝试使用 TopicProcessor 实现相同的想法,但遇到相同的问题:
public static void main(String[] args) throws Exception {
Flux<Integer> flux1 = Flux.generate(emitter -> {
emitter.next(1);
try {
Thread.sleep(100);
} catch (Exception e) {}
});
Flux<Integer> flux2 = Flux.generate(emitter -> {
emitter.next(2);
try {
Thread.sleep(100);
} catch (Exception e) {}
});
TopicProcessor<Integer> processor = TopicProcessor.create();
flux1.subscribe(processor);
flux2.subscribe(processor);
processor.subscribe(s -> System.out.println(s));
Thread.currentThread().join();
}
Note that merge is tailored to work with asynchronous sources or finite sources. When dealing with an infinite source that doesn't already publish on a dedicated Scheduler, you must isolate that source in its own Scheduler, as merge would otherwise attempt to drain it before subscribing to another source.
您在这里创建了一个没有专用调度程序的无限源,因此它试图在合并之前完全耗尽该源 - 这就是您遇到问题的原因。
这在您的 real-world 用例中可能不是问题,因为 GET
请求的结果大概不会是无限的。然而,如果你想确保结果是交错的,你只需要确保你设置每个通量都有自己的调度程序(通过在每个通量上调用 subscribeOn(Schedulers.elastic());
。)
所以你的例子变成了:
Flux<Integer> flux1 = Flux.<Integer>generate(emitter -> emitter.next(1))
.subscribeOn(Schedulers.elastic());
Flux<Integer> flux2 = Flux.<Integer>generate(emitter -> emitter.next(2))
.subscribeOn(Schedulers.elastic());
Flux<Integer> merged = flux1.mergeWith(flux2);
merged.subscribe(s -> System.out.println(s));
Thread.currentThread().join();