Spring 5 Web 响应式编程 - WebClient ClassCastException 当从 Spring 流式传输数据的响应式控制器解组 JSON 时
Spring 5 Web Reactive Programming - WebClient ClassCastException when unmarshalling JSON from Spring Reactive Controller that streams data
这个问题与 有关,我在其中询问了如何从响应式 Spring 控制器流式传输数据。
正如罗森指出的那样,我们必须使用 text/event-stream
将流式处理结果作为服务器发送的事件发回,目前一切顺利。
我有这样的服务:
@GetMapping(value="/accounts/alertsStreaming", headers="accept=text/event-stream")
public Flux<Alert> getAccountAlertsStreaming() {
return Flux.fromArray(new Alert[]{new Alert((long)1, "Alert message"),
new Alert((long)2, "Alert message2"),
new Alert((long)3, "Alert message3")})
.delayMillis(1000)
.log();
}
从浏览器调用它,开始接收 3 个结果,延迟 1 秒。
我想从 WebClient 调用此服务并以这种方式实现它:
@Component
public class AccountsServiceClient {
@Autowired
private WebClient webClient;
public Flux<Alert> getAccountAlertsStreaming(String serviceBaseUrl){
Flux<Alert> response = webClient
.perform(get(serviceBaseUrl+"/accounts/alertsStreaming").header("Accept", "text/event-stream"))
.extract(bodyStream(Alert.class));
return response;
}
}
这是测试代码:
@Test
@ContextConfiguration(classes={WebClientConfig.class, AccountsServiceClient.class})
public class AccountsServiceClientTest extends AbstractTestNGSpringContextTests{
private Logger logger = LoggerFactory.getLogger(getClass());
@Autowired
private AccountsServiceClient client;
public void testNumbersServiceClientStreamingTest() throws InterruptedException{
CountDownLatch latch = new CountDownLatch(1);
Flux<Alert> alerts = client.getAccountAlertsStreaming("http://localhost:8080");
alerts.doOnComplete( () -> {
latch.countDown();
}).subscribe( (n) -> {
logger.info("------------> GOT ALERT {}", n);
});
latch.await();
}
}
问题在于,当客户端尝试提取结果时,HttpMessageReader'
中的 none 可以读取 text/event-stream
+ Alert.class
。
public class ResponseExtractors {
protected static HttpMessageReader<?> resolveMessageReader(List<HttpMessageReader<?>> messageReaders,
ResolvableType responseType, MediaType contentType) {
return messageReaders.stream()
.filter(e -> e.canRead(responseType, contentType))
.findFirst()
.orElseThrow(() ->
new WebClientException(
"Could not decode response body of type '" + contentType
+ "' with target type '" + respons
eType.toString() + "'"));
}
异常:
reactor.core.Exceptions$BubblingException: org.springframework.web.client.reactive.WebClientException: Could not decode response body of type 'text/event-stream' with target type 'com.codependent.spring5.playground.reactive.dto.Alert'
at reactor.core.Exceptions.bubble(Exceptions.java:97)
at reactor.core.Exceptions.onErrorDropped(Exceptions.java:263)
at reactor.core.publisher.LambdaSubscriber.onError(LambdaSubscriber.java:126)
at reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:183)
at reactor.core.publisher.MonoFlatMap$FlattenSubscriber.onNext(MonoFlatMap.java:128)
at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:169)
at reactor.core.publisher.FluxLog$LoggerSubscriber.doNext(FluxLog.java:161)
at reactor.core.publisher.OperatorAdapter.onNext(OperatorAdapter.java:88)
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:123)
at reactor.core.publisher.FluxResume$ResumeSubscriber.onNext(FluxResume.java:75)
at reactor.core.publisher.FluxJust$WeakScalarSubscription.request(FluxJust.java:103)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:1010)
at reactor.core.publisher.FluxResume$ResumeSubscriber.onSubscribe(FluxResume.java:70)
at reactor.core.publisher.FluxJust.subscribe(FluxJust.java:71)
at reactor.ipc.netty.http.NettyHttpClientHandler.channelRead(NettyHttpClientHandler.java:120)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350)
at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:435)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:280)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:396)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:248)
at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:250)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350)
at io.netty.handler.logging.LoggingHandler.channelRead(LoggingHandler.java:233)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1334)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:926)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:123)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:571)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:512)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:426)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:398)
at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:877)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.springframework.web.client.reactive.WebClientException: Could not decode response body of type 'text/event-stream' with target type 'com.codependent.spring5.playground.reactive.dto.Alert'
at org.springframework.web.client.reactive.ResponseExtractors.lambda$resolveMessageReader(ResponseExtractors.java:203)
at org.springframework.web.client.reactive.ResponseExtractors$$Lambda/1950155746.get(Unknown Source)
at java.util.Optional.orElseThrow(Optional.java:290)
at org.springframework.web.client.reactive.ResponseExtractors.resolveMessageReader(ResponseExtractors.java:200)
at org.springframework.web.client.reactive.ResponseExtractors.decodeResponseBody(ResponseExtractors.java:181)
at org.springframework.web.client.reactive.ResponseExtractors.lambda$null(ResponseExtractors.java:89)
at org.springframework.web.client.reactive.ResponseExtractors$$Lambda/70386506.apply(Unknown Source)
at reactor.core.publisher.MonoFlatMap$FlattenSubscriber.onNext(MonoFlatMap.java:126)
... 37 common frames omitted
也许这应该由框架自动处理。无论如何,我解决了它自己解组 JSON 流数据:
WebConfigClient:
@Configuration
public class WebClientConfig {
@Bean
public ObjectMapper jacksonObjectMapper(){
return new ObjectMapper();
}
@Bean
public WebClient webClient(){
WebClient webClient = new WebClient(new ReactorClientHttpConnector());
return webClient;
}
}
服务客户端:
@Component
public class AccountsServiceClient {
@Autowired
private WebClient webClient;
@Autowired
private ObjectMapper jacksonObjectMapper;
public Flux<Alert> getAccountAlertsStreaming(String serviceBaseUrl){
Flux<Alert> response = webClient
.perform(get(serviceBaseUrl+"/accounts/alertsStreaming").header("Accept", "text/event-stream"))
.extract(bodyStream(String.class))
.map((e -> {
try {
e = e.substring(e.indexOf(":")+1);
Alert a = jacksonObjectMapper.readValue(e, Alert.class);
return a;
} catch (Exception e1) {
e1.printStackTrace();
return null;
}
}));
return response;
}
}
更新: 从 Spring 5 M4 开始,这是由框架完成的。您可以在此处使用新 API 检查解决方案:
这方面已经存在问题。 comment/vote SPR-14539.
这个问题与
正如罗森指出的那样,我们必须使用 text/event-stream
将流式处理结果作为服务器发送的事件发回,目前一切顺利。
我有这样的服务:
@GetMapping(value="/accounts/alertsStreaming", headers="accept=text/event-stream")
public Flux<Alert> getAccountAlertsStreaming() {
return Flux.fromArray(new Alert[]{new Alert((long)1, "Alert message"),
new Alert((long)2, "Alert message2"),
new Alert((long)3, "Alert message3")})
.delayMillis(1000)
.log();
}
从浏览器调用它,开始接收 3 个结果,延迟 1 秒。
我想从 WebClient 调用此服务并以这种方式实现它:
@Component
public class AccountsServiceClient {
@Autowired
private WebClient webClient;
public Flux<Alert> getAccountAlertsStreaming(String serviceBaseUrl){
Flux<Alert> response = webClient
.perform(get(serviceBaseUrl+"/accounts/alertsStreaming").header("Accept", "text/event-stream"))
.extract(bodyStream(Alert.class));
return response;
}
}
这是测试代码:
@Test
@ContextConfiguration(classes={WebClientConfig.class, AccountsServiceClient.class})
public class AccountsServiceClientTest extends AbstractTestNGSpringContextTests{
private Logger logger = LoggerFactory.getLogger(getClass());
@Autowired
private AccountsServiceClient client;
public void testNumbersServiceClientStreamingTest() throws InterruptedException{
CountDownLatch latch = new CountDownLatch(1);
Flux<Alert> alerts = client.getAccountAlertsStreaming("http://localhost:8080");
alerts.doOnComplete( () -> {
latch.countDown();
}).subscribe( (n) -> {
logger.info("------------> GOT ALERT {}", n);
});
latch.await();
}
}
问题在于,当客户端尝试提取结果时,HttpMessageReader'
中的 none 可以读取 text/event-stream
+ Alert.class
。
public class ResponseExtractors {
protected static HttpMessageReader<?> resolveMessageReader(List<HttpMessageReader<?>> messageReaders,
ResolvableType responseType, MediaType contentType) {
return messageReaders.stream()
.filter(e -> e.canRead(responseType, contentType))
.findFirst()
.orElseThrow(() ->
new WebClientException(
"Could not decode response body of type '" + contentType
+ "' with target type '" + respons
eType.toString() + "'"));
}
异常:
reactor.core.Exceptions$BubblingException: org.springframework.web.client.reactive.WebClientException: Could not decode response body of type 'text/event-stream' with target type 'com.codependent.spring5.playground.reactive.dto.Alert'
at reactor.core.Exceptions.bubble(Exceptions.java:97)
at reactor.core.Exceptions.onErrorDropped(Exceptions.java:263)
at reactor.core.publisher.LambdaSubscriber.onError(LambdaSubscriber.java:126)
at reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:183)
at reactor.core.publisher.MonoFlatMap$FlattenSubscriber.onNext(MonoFlatMap.java:128)
at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:169)
at reactor.core.publisher.FluxLog$LoggerSubscriber.doNext(FluxLog.java:161)
at reactor.core.publisher.OperatorAdapter.onNext(OperatorAdapter.java:88)
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:123)
at reactor.core.publisher.FluxResume$ResumeSubscriber.onNext(FluxResume.java:75)
at reactor.core.publisher.FluxJust$WeakScalarSubscription.request(FluxJust.java:103)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:1010)
at reactor.core.publisher.FluxResume$ResumeSubscriber.onSubscribe(FluxResume.java:70)
at reactor.core.publisher.FluxJust.subscribe(FluxJust.java:71)
at reactor.ipc.netty.http.NettyHttpClientHandler.channelRead(NettyHttpClientHandler.java:120)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350)
at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:435)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:280)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:396)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:248)
at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:250)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350)
at io.netty.handler.logging.LoggingHandler.channelRead(LoggingHandler.java:233)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1334)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:926)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:123)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:571)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:512)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:426)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:398)
at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:877)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.springframework.web.client.reactive.WebClientException: Could not decode response body of type 'text/event-stream' with target type 'com.codependent.spring5.playground.reactive.dto.Alert'
at org.springframework.web.client.reactive.ResponseExtractors.lambda$resolveMessageReader(ResponseExtractors.java:203)
at org.springframework.web.client.reactive.ResponseExtractors$$Lambda/1950155746.get(Unknown Source)
at java.util.Optional.orElseThrow(Optional.java:290)
at org.springframework.web.client.reactive.ResponseExtractors.resolveMessageReader(ResponseExtractors.java:200)
at org.springframework.web.client.reactive.ResponseExtractors.decodeResponseBody(ResponseExtractors.java:181)
at org.springframework.web.client.reactive.ResponseExtractors.lambda$null(ResponseExtractors.java:89)
at org.springframework.web.client.reactive.ResponseExtractors$$Lambda/70386506.apply(Unknown Source)
at reactor.core.publisher.MonoFlatMap$FlattenSubscriber.onNext(MonoFlatMap.java:126)
... 37 common frames omitted
也许这应该由框架自动处理。无论如何,我解决了它自己解组 JSON 流数据:
WebConfigClient:
@Configuration
public class WebClientConfig {
@Bean
public ObjectMapper jacksonObjectMapper(){
return new ObjectMapper();
}
@Bean
public WebClient webClient(){
WebClient webClient = new WebClient(new ReactorClientHttpConnector());
return webClient;
}
}
服务客户端:
@Component
public class AccountsServiceClient {
@Autowired
private WebClient webClient;
@Autowired
private ObjectMapper jacksonObjectMapper;
public Flux<Alert> getAccountAlertsStreaming(String serviceBaseUrl){
Flux<Alert> response = webClient
.perform(get(serviceBaseUrl+"/accounts/alertsStreaming").header("Accept", "text/event-stream"))
.extract(bodyStream(String.class))
.map((e -> {
try {
e = e.substring(e.indexOf(":")+1);
Alert a = jacksonObjectMapper.readValue(e, Alert.class);
return a;
} catch (Exception e1) {
e1.printStackTrace();
return null;
}
}));
return response;
}
}
更新: 从 Spring 5 M4 开始,这是由框架完成的。您可以在此处使用新 API 检查解决方案:
这方面已经存在问题。 comment/vote SPR-14539.