Mono.subscribe(consumer, errorConsumer, completeConsumer, subscriptionConsumer) 没有调用consumer和completeConsumer?
Mono.subscribe(consumer, errorConsumer, completeConsumer, subscriptionConsumer) does not invoke consumer and completeConsumer?
作为主题,Mono.subscribe
的第 4 个变体似乎没有调用成功消费者和完整消费者。它只调用订阅消费者。
下面的代码失败了
CountDownLatch latch = new CountDownLatch(3);
Mono.just(1).subscribe(i -> latch.countDown(), throwable -> {}, latch::countDown, s -> latch.countDown());
boolean success = latch.await(1, TimeUnit.SECONDS);
Assert.assertEquals(true, success);
那是因为带有 Consumer<Subscription>
的 subscribe
版本是为了让您驱动最初的 request
。如果您不在消费者中的订阅上调用 request(n)
,则不会发出任何数据并且 Mono
将无法完成...
这按预期工作:
CountDownLatch latch = new CountDownLatch(3);
Mono.just(1).subscribe(
i -> latch.countDown(),
throwable -> {},
latch::countDown,
s -> {
s.request(Long.MAX_VALUE);
latch.countDown();
});
boolean success = latch.await(1, TimeUnit.SECONDS);
Assert.assertEquals(true, success);
作为主题,Mono.subscribe
的第 4 个变体似乎没有调用成功消费者和完整消费者。它只调用订阅消费者。
下面的代码失败了
CountDownLatch latch = new CountDownLatch(3);
Mono.just(1).subscribe(i -> latch.countDown(), throwable -> {}, latch::countDown, s -> latch.countDown());
boolean success = latch.await(1, TimeUnit.SECONDS);
Assert.assertEquals(true, success);
那是因为带有 Consumer<Subscription>
的 subscribe
版本是为了让您驱动最初的 request
。如果您不在消费者中的订阅上调用 request(n)
,则不会发出任何数据并且 Mono
将无法完成...
这按预期工作:
CountDownLatch latch = new CountDownLatch(3);
Mono.just(1).subscribe(
i -> latch.countDown(),
throwable -> {},
latch::countDown,
s -> {
s.request(Long.MAX_VALUE);
latch.countDown();
});
boolean success = latch.await(1, TimeUnit.SECONDS);
Assert.assertEquals(true, success);