Webflux 网络客户端 java.lang.NullPointerException
Webflux WebClient java.lang.NullPointerException
我在将 webflux 网络客户端连接到我的端点时遇到问题。
在我的控制器中,我正在尝试公开 Chocolate DTO 的热流。
当我尝试用邮递员调用端点时,它起作用了。
这句话存在是因为Whosebug要我添加更多文本,因为我的问题中代码太多,不便之处请见谅。
@RestController
public class HotChocolateController {
@GetMapping(value = "/stream/chocolate", produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
public Flux<Chocolate> chocolateStream() {
return Flux.interval(Duration.ofMillis(500))
.map(l -> Chocolate.builder()
.id(String.valueOf(System.currentTimeMillis()))
.name("Hot Chocolate")
.build())
.log();
}
}
然后我尝试通过单元测试来测试我的设置:
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.DEFINED_PORT)
class ReactiveEndpointTests {
private final WebClient client = WebClient.builder().baseUrl("http://localhost:8080").build();
@Test
public void testHotChocolate(){
Flux<Chocolate> chocolateSource = client.get().uri("/stream/chocolate").retrieve().bodyToFlux(Chocolate.class);
chocolateSource.subscribe(chocolate -> System.out.println());
}
当我运行这个测试时,我得到以下异常:
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.NullPointerException
Caused by: java.lang.NullPointerException: null
at reactor.netty.resources.ColocatedEventLoopGroup.register(ColocatedEventLoopGroup.java:71) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Error has been observed at the following site(s):
|_ checkpoint ⇢ Request to GET http://localhost:8080/stream/chocolate [DefaultWebClient]
Stack trace:
at reactor.netty.resources.ColocatedEventLoopGroup.register(ColocatedEventLoopGroup.java:71) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
at io.netty.bootstrap.AbstractBootstrap.initAndRegister(AbstractBootstrap.java:315) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.bootstrap.Bootstrap.doResolveAndConnect(Bootstrap.java:155) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.bootstrap.Bootstrap.connect(Bootstrap.java:116) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at reactor.netty.resources.PooledConnectionProvider$PooledConnectionAllocator.lambda$connectChannel[=13=](PooledConnectionProvider.java:248) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
at reactor.core.publisher.MonoCreate.subscribe(MonoCreate.java:57) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.Mono.subscribe(Mono.java:4105) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.Mono.subscribeWith(Mono.java:4211) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.Mono.subscribe(Mono.java:4077) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.Mono.subscribe(Mono.java:4013) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.netty.internal.shaded.reactor.pool.SimplePool.drainLoop(SimplePool.java:201) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
at reactor.netty.internal.shaded.reactor.pool.SimplePool.drain(SimplePool.java:172) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
at reactor.netty.internal.shaded.reactor.pool.SimplePool.doAcquire(SimplePool.java:132) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
at reactor.netty.internal.shaded.reactor.pool.AbstractPool$Borrower.request(AbstractPool.java:336) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
at reactor.netty.resources.PooledConnectionProvider$DisposableAcquire.onSubscribe(PooledConnectionProvider.java:504) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
at reactor.netty.internal.shaded.reactor.pool.SimplePool$QueueBorrowerMono.subscribe(SimplePool.java:324) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
at reactor.netty.resources.PooledConnectionProvider.disposableAcquire(PooledConnectionProvider.java:212) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
at reactor.netty.resources.PooledConnectionProvider.lambda$acquire(PooledConnectionProvider.java:169) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
at reactor.core.publisher.MonoCreate.subscribe(MonoCreate.java:57) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.netty.http.client.HttpClientConnect$MonoHttpConnect.lambda$subscribe[=13=](HttpClientConnect.java:319) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
at reactor.core.publisher.MonoCreate.subscribe(MonoCreate.java:57) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.FluxRetryPredicate$RetryPredicateSubscriber.resubscribe(FluxRetryPredicate.java:124) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.FluxRetryPredicate$RetryPredicateSubscriber.onError(FluxRetryPredicate.java:99) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.MonoCreate$DefaultMonoSink.error(MonoCreate.java:183) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.netty.http.client.HttpClientConnect$MonoHttpConnect$TcpClientSubscriber.onError(HttpClientConnect.java:345) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
at reactor.core.publisher.MonoCreate$DefaultMonoSink.error(MonoCreate.java:183) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.netty.resources.PooledConnectionProvider$DisposableAcquire.onError(PooledConnectionProvider.java:496) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
at reactor.netty.internal.shaded.reactor.pool.AbstractPool$Borrower.fail(AbstractPool.java:381) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
at reactor.netty.internal.shaded.reactor.pool.SimplePool.lambda$drainLoop(SimplePool.java:206) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
at reactor.core.publisher.LambdaMonoSubscriber.doError(LambdaMonoSubscriber.java:152) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.LambdaMonoSubscriber.onError(LambdaMonoSubscriber.java:147) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.MonoCreate$DefaultMonoSink.error(MonoCreate.java:183) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.netty.resources.PooledConnectionProvider$PooledConnectionAllocator$PooledConnectionInitializer.operationComplete(PooledConnectionProvider.java:307) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
at reactor.netty.resources.PooledConnectionProvider$PooledConnectionAllocator$PooledConnectionInitializer.operationComplete(PooledConnectionProvider.java:257) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:577) ~[netty-common-4.1.45.Final.jar:4.1.45.Final]
at io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:570) ~[netty-common-4.1.45.Final.jar:4.1.45.Final]
at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:549) ~[netty-common-4.1.45.Final.jar:4.1.45.Final]
at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:490) ~[netty-common-4.1.45.Final.jar:4.1.45.Final]
at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:615) ~[netty-common-4.1.45.Final.jar:4.1.45.Final]
at io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:608) ~[netty-common-4.1.45.Final.jar:4.1.45.Final]
at io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117) ~[netty-common-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.nio.AbstractNioChannel.doClose(AbstractNioChannel.java:502) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.socket.nio.NioSocketChannel.doClose(NioSocketChannel.java:342) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.AbstractChannel$AbstractUnsafe.doClose0(AbstractChannel.java:759) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.AbstractChannel$AbstractUnsafe.close(AbstractChannel.java:736) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.AbstractChannel$AbstractUnsafe.close(AbstractChannel.java:607) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.nio.NioEventLoop.closeAll(NioEventLoop.java:762) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:524) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:989) ~[netty-common-4.1.45.Final.jar:4.1.45.Final]
at io.netty.util.internal.ThreadExecutorMap.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.45.Final.jar:4.1.45.Final]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.45.Final.jar:4.1.45.Final]
at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
你知道为什么会发生这种情况吗?
您刚刚订阅了流并立即退出了测试方法。
您可以将方法更改为类似下面的代码,您将能够看到获取请求的结果:
@Test
public void testHotChocolate(){
Flux<String> chocolateSource =
client.get()
.uri("/stream/chocolate")
.retrieve()
.bodyToFlux(String.class)
.doOnNext(System.out::println);
chocolateSource.blockLast();
}
我在将 webflux 网络客户端连接到我的端点时遇到问题。
在我的控制器中,我正在尝试公开 Chocolate DTO 的热流。 当我尝试用邮递员调用端点时,它起作用了。
这句话存在是因为Whosebug要我添加更多文本,因为我的问题中代码太多,不便之处请见谅。
@RestController
public class HotChocolateController {
@GetMapping(value = "/stream/chocolate", produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
public Flux<Chocolate> chocolateStream() {
return Flux.interval(Duration.ofMillis(500))
.map(l -> Chocolate.builder()
.id(String.valueOf(System.currentTimeMillis()))
.name("Hot Chocolate")
.build())
.log();
}
}
然后我尝试通过单元测试来测试我的设置:
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.DEFINED_PORT)
class ReactiveEndpointTests {
private final WebClient client = WebClient.builder().baseUrl("http://localhost:8080").build();
@Test
public void testHotChocolate(){
Flux<Chocolate> chocolateSource = client.get().uri("/stream/chocolate").retrieve().bodyToFlux(Chocolate.class);
chocolateSource.subscribe(chocolate -> System.out.println());
}
当我运行这个测试时,我得到以下异常:
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.NullPointerException
Caused by: java.lang.NullPointerException: null
at reactor.netty.resources.ColocatedEventLoopGroup.register(ColocatedEventLoopGroup.java:71) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Error has been observed at the following site(s):
|_ checkpoint ⇢ Request to GET http://localhost:8080/stream/chocolate [DefaultWebClient]
Stack trace:
at reactor.netty.resources.ColocatedEventLoopGroup.register(ColocatedEventLoopGroup.java:71) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
at io.netty.bootstrap.AbstractBootstrap.initAndRegister(AbstractBootstrap.java:315) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.bootstrap.Bootstrap.doResolveAndConnect(Bootstrap.java:155) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.bootstrap.Bootstrap.connect(Bootstrap.java:116) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at reactor.netty.resources.PooledConnectionProvider$PooledConnectionAllocator.lambda$connectChannel[=13=](PooledConnectionProvider.java:248) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
at reactor.core.publisher.MonoCreate.subscribe(MonoCreate.java:57) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.Mono.subscribe(Mono.java:4105) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.Mono.subscribeWith(Mono.java:4211) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.Mono.subscribe(Mono.java:4077) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.Mono.subscribe(Mono.java:4013) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.netty.internal.shaded.reactor.pool.SimplePool.drainLoop(SimplePool.java:201) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
at reactor.netty.internal.shaded.reactor.pool.SimplePool.drain(SimplePool.java:172) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
at reactor.netty.internal.shaded.reactor.pool.SimplePool.doAcquire(SimplePool.java:132) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
at reactor.netty.internal.shaded.reactor.pool.AbstractPool$Borrower.request(AbstractPool.java:336) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
at reactor.netty.resources.PooledConnectionProvider$DisposableAcquire.onSubscribe(PooledConnectionProvider.java:504) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
at reactor.netty.internal.shaded.reactor.pool.SimplePool$QueueBorrowerMono.subscribe(SimplePool.java:324) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
at reactor.netty.resources.PooledConnectionProvider.disposableAcquire(PooledConnectionProvider.java:212) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
at reactor.netty.resources.PooledConnectionProvider.lambda$acquire(PooledConnectionProvider.java:169) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
at reactor.core.publisher.MonoCreate.subscribe(MonoCreate.java:57) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.netty.http.client.HttpClientConnect$MonoHttpConnect.lambda$subscribe[=13=](HttpClientConnect.java:319) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
at reactor.core.publisher.MonoCreate.subscribe(MonoCreate.java:57) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.FluxRetryPredicate$RetryPredicateSubscriber.resubscribe(FluxRetryPredicate.java:124) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.FluxRetryPredicate$RetryPredicateSubscriber.onError(FluxRetryPredicate.java:99) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.MonoCreate$DefaultMonoSink.error(MonoCreate.java:183) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.netty.http.client.HttpClientConnect$MonoHttpConnect$TcpClientSubscriber.onError(HttpClientConnect.java:345) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
at reactor.core.publisher.MonoCreate$DefaultMonoSink.error(MonoCreate.java:183) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.netty.resources.PooledConnectionProvider$DisposableAcquire.onError(PooledConnectionProvider.java:496) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
at reactor.netty.internal.shaded.reactor.pool.AbstractPool$Borrower.fail(AbstractPool.java:381) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
at reactor.netty.internal.shaded.reactor.pool.SimplePool.lambda$drainLoop(SimplePool.java:206) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
at reactor.core.publisher.LambdaMonoSubscriber.doError(LambdaMonoSubscriber.java:152) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.LambdaMonoSubscriber.onError(LambdaMonoSubscriber.java:147) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.MonoCreate$DefaultMonoSink.error(MonoCreate.java:183) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.netty.resources.PooledConnectionProvider$PooledConnectionAllocator$PooledConnectionInitializer.operationComplete(PooledConnectionProvider.java:307) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
at reactor.netty.resources.PooledConnectionProvider$PooledConnectionAllocator$PooledConnectionInitializer.operationComplete(PooledConnectionProvider.java:257) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:577) ~[netty-common-4.1.45.Final.jar:4.1.45.Final]
at io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:570) ~[netty-common-4.1.45.Final.jar:4.1.45.Final]
at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:549) ~[netty-common-4.1.45.Final.jar:4.1.45.Final]
at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:490) ~[netty-common-4.1.45.Final.jar:4.1.45.Final]
at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:615) ~[netty-common-4.1.45.Final.jar:4.1.45.Final]
at io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:608) ~[netty-common-4.1.45.Final.jar:4.1.45.Final]
at io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117) ~[netty-common-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.nio.AbstractNioChannel.doClose(AbstractNioChannel.java:502) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.socket.nio.NioSocketChannel.doClose(NioSocketChannel.java:342) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.AbstractChannel$AbstractUnsafe.doClose0(AbstractChannel.java:759) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.AbstractChannel$AbstractUnsafe.close(AbstractChannel.java:736) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.AbstractChannel$AbstractUnsafe.close(AbstractChannel.java:607) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.nio.NioEventLoop.closeAll(NioEventLoop.java:762) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:524) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:989) ~[netty-common-4.1.45.Final.jar:4.1.45.Final]
at io.netty.util.internal.ThreadExecutorMap.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.45.Final.jar:4.1.45.Final]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.45.Final.jar:4.1.45.Final]
at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
你知道为什么会发生这种情况吗?
您刚刚订阅了流并立即退出了测试方法。 您可以将方法更改为类似下面的代码,您将能够看到获取请求的结果:
@Test
public void testHotChocolate(){
Flux<String> chocolateSource =
client.get()
.uri("/stream/chocolate")
.retrieve()
.bodyToFlux(String.class)
.doOnNext(System.out::println);
chocolateSource.blockLast();
}