Webflux 网络客户端 java.lang.NullPointerException

Webflux WebClient java.lang.NullPointerException

我在将 webflux 网络客户端连接到我的端点时遇到问题。

在我的控制器中,我正在尝试公开 Chocolate DTO 的热流。 当我尝试用邮递员调用端点时,它起作用了。

这句话存在是因为Whosebug要我添加更多文本,因为我的问题中代码太多,不便之处请见谅。

@RestController
public class HotChocolateController {

   @GetMapping(value = "/stream/chocolate", produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
   public Flux<Chocolate> chocolateStream() {
       return Flux.interval(Duration.ofMillis(500))
               .map(l -> Chocolate.builder()
                    .id(String.valueOf(System.currentTimeMillis()))
                       .name("Hot Chocolate")
                       .build())
               .log();
   }
}

然后我尝试通过单元测试来测试我的设置:

@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.DEFINED_PORT)
class ReactiveEndpointTests {

    private final WebClient client = WebClient.builder().baseUrl("http://localhost:8080").build();


    @Test
    public void testHotChocolate(){
        Flux<Chocolate> chocolateSource = client.get().uri("/stream/chocolate").retrieve().bodyToFlux(Chocolate.class);
        chocolateSource.subscribe(chocolate -> System.out.println());
    }

当我运行这个测试时,我得到以下异常:

reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.NullPointerException
Caused by: java.lang.NullPointerException: null
    at reactor.netty.resources.ColocatedEventLoopGroup.register(ColocatedEventLoopGroup.java:71) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
    Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Error has been observed at the following site(s):
    |_ checkpoint ⇢ Request to GET http://localhost:8080/stream/chocolate [DefaultWebClient]
Stack trace:
        at reactor.netty.resources.ColocatedEventLoopGroup.register(ColocatedEventLoopGroup.java:71) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
        at io.netty.bootstrap.AbstractBootstrap.initAndRegister(AbstractBootstrap.java:315) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
        at io.netty.bootstrap.Bootstrap.doResolveAndConnect(Bootstrap.java:155) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
        at io.netty.bootstrap.Bootstrap.connect(Bootstrap.java:116) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
        at reactor.netty.resources.PooledConnectionProvider$PooledConnectionAllocator.lambda$connectChannel[=13=](PooledConnectionProvider.java:248) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
        at reactor.core.publisher.MonoCreate.subscribe(MonoCreate.java:57) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
        at reactor.core.publisher.Mono.subscribe(Mono.java:4105) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
        at reactor.core.publisher.Mono.subscribeWith(Mono.java:4211) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
        at reactor.core.publisher.Mono.subscribe(Mono.java:4077) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
        at reactor.core.publisher.Mono.subscribe(Mono.java:4013) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
        at reactor.netty.internal.shaded.reactor.pool.SimplePool.drainLoop(SimplePool.java:201) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
        at reactor.netty.internal.shaded.reactor.pool.SimplePool.drain(SimplePool.java:172) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
        at reactor.netty.internal.shaded.reactor.pool.SimplePool.doAcquire(SimplePool.java:132) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
        at reactor.netty.internal.shaded.reactor.pool.AbstractPool$Borrower.request(AbstractPool.java:336) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
        at reactor.netty.resources.PooledConnectionProvider$DisposableAcquire.onSubscribe(PooledConnectionProvider.java:504) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
        at reactor.netty.internal.shaded.reactor.pool.SimplePool$QueueBorrowerMono.subscribe(SimplePool.java:324) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
        at reactor.netty.resources.PooledConnectionProvider.disposableAcquire(PooledConnectionProvider.java:212) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
        at reactor.netty.resources.PooledConnectionProvider.lambda$acquire(PooledConnectionProvider.java:169) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
        at reactor.core.publisher.MonoCreate.subscribe(MonoCreate.java:57) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
        at reactor.netty.http.client.HttpClientConnect$MonoHttpConnect.lambda$subscribe[=13=](HttpClientConnect.java:319) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
        at reactor.core.publisher.MonoCreate.subscribe(MonoCreate.java:57) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
        at reactor.core.publisher.FluxRetryPredicate$RetryPredicateSubscriber.resubscribe(FluxRetryPredicate.java:124) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
        at reactor.core.publisher.FluxRetryPredicate$RetryPredicateSubscriber.onError(FluxRetryPredicate.java:99) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
        at reactor.core.publisher.MonoCreate$DefaultMonoSink.error(MonoCreate.java:183) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
        at reactor.netty.http.client.HttpClientConnect$MonoHttpConnect$TcpClientSubscriber.onError(HttpClientConnect.java:345) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
        at reactor.core.publisher.MonoCreate$DefaultMonoSink.error(MonoCreate.java:183) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
        at reactor.netty.resources.PooledConnectionProvider$DisposableAcquire.onError(PooledConnectionProvider.java:496) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
        at reactor.netty.internal.shaded.reactor.pool.AbstractPool$Borrower.fail(AbstractPool.java:381) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
        at reactor.netty.internal.shaded.reactor.pool.SimplePool.lambda$drainLoop(SimplePool.java:206) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
        at reactor.core.publisher.LambdaMonoSubscriber.doError(LambdaMonoSubscriber.java:152) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
        at reactor.core.publisher.LambdaMonoSubscriber.onError(LambdaMonoSubscriber.java:147) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
        at reactor.core.publisher.MonoCreate$DefaultMonoSink.error(MonoCreate.java:183) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
        at reactor.netty.resources.PooledConnectionProvider$PooledConnectionAllocator$PooledConnectionInitializer.operationComplete(PooledConnectionProvider.java:307) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
        at reactor.netty.resources.PooledConnectionProvider$PooledConnectionAllocator$PooledConnectionInitializer.operationComplete(PooledConnectionProvider.java:257) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
        at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:577) ~[netty-common-4.1.45.Final.jar:4.1.45.Final]
        at io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:570) ~[netty-common-4.1.45.Final.jar:4.1.45.Final]
        at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:549) ~[netty-common-4.1.45.Final.jar:4.1.45.Final]
        at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:490) ~[netty-common-4.1.45.Final.jar:4.1.45.Final]
        at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:615) ~[netty-common-4.1.45.Final.jar:4.1.45.Final]
        at io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:608) ~[netty-common-4.1.45.Final.jar:4.1.45.Final]
        at io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117) ~[netty-common-4.1.45.Final.jar:4.1.45.Final]
        at io.netty.channel.nio.AbstractNioChannel.doClose(AbstractNioChannel.java:502) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
        at io.netty.channel.socket.nio.NioSocketChannel.doClose(NioSocketChannel.java:342) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
        at io.netty.channel.AbstractChannel$AbstractUnsafe.doClose0(AbstractChannel.java:759) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
        at io.netty.channel.AbstractChannel$AbstractUnsafe.close(AbstractChannel.java:736) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
        at io.netty.channel.AbstractChannel$AbstractUnsafe.close(AbstractChannel.java:607) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
        at io.netty.channel.nio.NioEventLoop.closeAll(NioEventLoop.java:762) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:524) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
        at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:989) ~[netty-common-4.1.45.Final.jar:4.1.45.Final]
        at io.netty.util.internal.ThreadExecutorMap.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.45.Final.jar:4.1.45.Final]
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.45.Final.jar:4.1.45.Final]
        at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]

你知道为什么会发生这种情况吗?

您刚刚订阅了流并立即退出了测试方法。 您可以将方法更改为类似下面的代码,您将能够看到获取请求的结果:

    @Test
    public void testHotChocolate(){
        Flux<String> chocolateSource =
                client.get()
                        .uri("/stream/chocolate")
                        .retrieve()
                        .bodyToFlux(String.class)
                        .doOnNext(System.out::println);
        chocolateSource.blockLast();
    }