Reactor 3 中缺少来自 TopicProcessor 的消息

Missing messages from TopicProcessor in Reactor 3

我是 运行 一个简单的测试,我将消息从 4 个线程发布到 TopicProcessor,而在订阅者中,我只是将它们添加到一个集合中。代码如下:

@Test
public void testProcessingMessages() throws Exception {
    int numberOfMessages = 1000;

    TopicProcessor<Integer> processor = TopicProcessor.create();

    ExecutorService executorService = Executors.newFixedThreadPool(4);

    Queue<Integer> messages = new ConcurrentLinkedQueue<>();

    processor.subscribe(messages::add);

    AtomicInteger counter = new AtomicInteger(0);
    for (int i = 0; i < numberOfMessages; i++) {
        executorService.submit(() -> {
            processor.onNext(counter.incrementAndGet());
        });
    }

    Thread.sleep(10000);

    assertEquals(numberOfMessages, messages.size());
}

但是断言最终失败了,实际消息通常在 980-990 条左右,而不是预期的 1000 条。 我错过了什么吗?

问题是 TopicProcessor.create 创建了一个期望从单个线程发布的处理器。 TopicProcessor.share 从多线程生成时应该使用。