Reactor 3 中缺少来自 TopicProcessor 的消息
Missing messages from TopicProcessor in Reactor 3
我是 运行 一个简单的测试,我将消息从 4 个线程发布到 TopicProcessor,而在订阅者中,我只是将它们添加到一个集合中。代码如下:
@Test
public void testProcessingMessages() throws Exception {
int numberOfMessages = 1000;
TopicProcessor<Integer> processor = TopicProcessor.create();
ExecutorService executorService = Executors.newFixedThreadPool(4);
Queue<Integer> messages = new ConcurrentLinkedQueue<>();
processor.subscribe(messages::add);
AtomicInteger counter = new AtomicInteger(0);
for (int i = 0; i < numberOfMessages; i++) {
executorService.submit(() -> {
processor.onNext(counter.incrementAndGet());
});
}
Thread.sleep(10000);
assertEquals(numberOfMessages, messages.size());
}
但是断言最终失败了,实际消息通常在 980-990 条左右,而不是预期的 1000 条。
我错过了什么吗?
问题是 TopicProcessor.create
创建了一个期望从单个线程发布的处理器。 TopicProcessor.share
从多线程生成时应该使用。
我是 运行 一个简单的测试,我将消息从 4 个线程发布到 TopicProcessor,而在订阅者中,我只是将它们添加到一个集合中。代码如下:
@Test
public void testProcessingMessages() throws Exception {
int numberOfMessages = 1000;
TopicProcessor<Integer> processor = TopicProcessor.create();
ExecutorService executorService = Executors.newFixedThreadPool(4);
Queue<Integer> messages = new ConcurrentLinkedQueue<>();
processor.subscribe(messages::add);
AtomicInteger counter = new AtomicInteger(0);
for (int i = 0; i < numberOfMessages; i++) {
executorService.submit(() -> {
processor.onNext(counter.incrementAndGet());
});
}
Thread.sleep(10000);
assertEquals(numberOfMessages, messages.size());
}
但是断言最终失败了,实际消息通常在 980-990 条左右,而不是预期的 1000 条。 我错过了什么吗?
问题是 TopicProcessor.create
创建了一个期望从单个线程发布的处理器。 TopicProcessor.share
从多线程生成时应该使用。