如何在 Netty Reactor HTTP 服务器响应中将 Mono<ResponseEntity> 正确发送为 JSON
How to properly send Mono<ResponseEntity> as JSON in Netty Reactor HTTP Server response
我正在尝试弄清楚如何从 Netty Reactor HTTP Server 以 JSON 的形式正确发送 ResponseEntity 响应。
我当前的实现对来自 WebClient 的请求做出反应,并且应该发回具有某些 ResponseEntity 状态的响应(让我们假设只是 HTTP OK)。
不幸的是,我仍然在客户端收到 InvalidDefinitionException,说由于没有默认构造函数而无法构造实例。
我知道这是什么意思,但是例如 Spring Webflux 也可以有 return 类型的其余端点 Mono,客户端不会出现任何问题。
那么是否有可能在服务器端将实体正确序列化为 JSON 并在客户端将其反序列化?
这是我的客户
import org.springframework.web.reactive.function.client.WebClient;
public Mono<ResponseEntity> postRequest(final Object body, final String uri) {
return webClient.post()
.uri(uri)
.contentType(MediaType.APPLICATION_JSON)
.body(BodyInserters.fromValue(body))
.exchange()
.flatMap(clientResponse -> clientResponse.toEntity(ResponseEntity.class));
}
这是我的服务器
public void runWithPost(final String endpointPath, final ServerCallback callback) {
server = HttpServer.create()
.host(this.host)
.port(this.port)
.route(routes ->
routes.post(endpointPath, (request, response) ->
response.addHeader(CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
.sendString(Mono.just(getJSON(callback.handleCallback())))))
.wiretap(true)
.bindNow();
System.out.println("Starting server...");
}
private String getJSON(final ResponseEntity responseEntity) {
String json = StringUtils.EMPTY;
try {
json = objectMapper.writeValueAsString(responseEntity);
System.out.println("Serialized JSON: " + json);
} catch (final JsonProcessingException ex) {
System.err.println("JSON serializer error: " + ex.getMessage());
}
return json;
}
这是回调
public interface ServerCallback {
ResponseEntity handleCallback();
}
和用法
reactiveRestServer.runWithPost("/transaction", () -> ResponseEntity.ok().build());
不幸的是,在客户端我没有获得 HTTP 状态正常但反序列化异常:
2020-04-28 16:09:35.345 ERROR 15136 --- [ctor-http-nio-2] c.a.t.t.inbound.ArpMessageServiceImpl : Type definition error: [simple type, class org.springframework.http.ResponseEntity]; nested exception is com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Cannot construct instance of `org.springframework.http.ResponseEntity` (no Creators, like default construct, exist): cannot deserialize from Object value (no delegate- or property-based Creator)
at [Source: (io.netty.buffer.ByteBufInputStream); line: 1, column: 2]
2020-04-28 16:09:35.349 WARN 15136 --- [ctor-http-nio-2] io.netty.util.ReferenceCountUtil : Failed to release a message: DefaultLastHttpContent(data: PooledSlicedByteBuf(freed), decoderResult: success)
io.netty.util.IllegalReferenceCountException: refCnt: 0, decrement: 1
at io.netty.util.internal.ReferenceCountUpdater.toLiveRealRefCnt(ReferenceCountUpdater.java:74) ~[netty-common-4.1.45.Final.jar:4.1.45.Final]
我错过了什么?
所以我终于解决了这个问题。对于那些将要解决类似问题的人来说,这里就是答案。
问题在于 Spring Webflux 将 ResponseEntity 转换为 DefaultFullHttpResponse,因此 DefaultFullHttpResponse 包含 headers、status 以及 body。我通过完全相同的方法解决了这个问题。
public void runWithPost(final String endpointPath, final ServerCallback callback) {
if (server == null || server.isDisposed()) {
server = HttpServer.create()
.host(this.host)
.port(this.port)
.route(routes ->
routes.post(endpointPath, (request, response) -> processResponse(response, callback)))
.wiretap(true)
.bindNow();
logger.info("Starting server...");
} else {
logger.info("Couldn't start server because one is already running!");
}
}
转换就在这里
private NettyOutbound processResponse(final HttpServerResponse response, final ServerCallback callback) {
final ResponseEntity responseEntity = callback.handleCallback();
// set status
response.status(responseEntity.getStatusCodeValue());
// set headers
final HttpHeaders entityHeaders = responseEntity.getHeaders();
if (!entityHeaders.isEmpty()) {
entityHeaders.entrySet().stream()
.forEach(entry -> response.addHeader(entry.getKey(), buildValue(entry.getValue())));
}
if (responseEntity.hasBody()) {
try {
final Object body = responseEntity.getBody();
if (body instanceof String) {
return response.sendString(Mono.just((String) body));
} else {
return response.send(Mono.just(Unpooled.wrappedBuffer(getBytesFromObject(body))));
}
} catch (final IOException ex) {
response.status(HttpResponseStatus.INTERNAL_SERVER_ERROR);
return response.sendString(Mono.just(ex.getMessage()));
}
}
// set body
return response.send(Mono.empty());
}
用法如下:
mockReactiveRestServer.runWithPost("/transaction", () -> ResponseEntity.ok().build());
我正在尝试弄清楚如何从 Netty Reactor HTTP Server 以 JSON 的形式正确发送 ResponseEntity 响应。
我当前的实现对来自 WebClient 的请求做出反应,并且应该发回具有某些 ResponseEntity 状态的响应(让我们假设只是 HTTP OK)。
不幸的是,我仍然在客户端收到 InvalidDefinitionException,说由于没有默认构造函数而无法构造实例。
我知道这是什么意思,但是例如 Spring Webflux 也可以有 return 类型的其余端点 Mono,客户端不会出现任何问题。 那么是否有可能在服务器端将实体正确序列化为 JSON 并在客户端将其反序列化?
这是我的客户
import org.springframework.web.reactive.function.client.WebClient;
public Mono<ResponseEntity> postRequest(final Object body, final String uri) {
return webClient.post()
.uri(uri)
.contentType(MediaType.APPLICATION_JSON)
.body(BodyInserters.fromValue(body))
.exchange()
.flatMap(clientResponse -> clientResponse.toEntity(ResponseEntity.class));
}
这是我的服务器
public void runWithPost(final String endpointPath, final ServerCallback callback) {
server = HttpServer.create()
.host(this.host)
.port(this.port)
.route(routes ->
routes.post(endpointPath, (request, response) ->
response.addHeader(CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
.sendString(Mono.just(getJSON(callback.handleCallback())))))
.wiretap(true)
.bindNow();
System.out.println("Starting server...");
}
private String getJSON(final ResponseEntity responseEntity) {
String json = StringUtils.EMPTY;
try {
json = objectMapper.writeValueAsString(responseEntity);
System.out.println("Serialized JSON: " + json);
} catch (final JsonProcessingException ex) {
System.err.println("JSON serializer error: " + ex.getMessage());
}
return json;
}
这是回调
public interface ServerCallback {
ResponseEntity handleCallback();
}
和用法
reactiveRestServer.runWithPost("/transaction", () -> ResponseEntity.ok().build());
不幸的是,在客户端我没有获得 HTTP 状态正常但反序列化异常:
2020-04-28 16:09:35.345 ERROR 15136 --- [ctor-http-nio-2] c.a.t.t.inbound.ArpMessageServiceImpl : Type definition error: [simple type, class org.springframework.http.ResponseEntity]; nested exception is com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Cannot construct instance of `org.springframework.http.ResponseEntity` (no Creators, like default construct, exist): cannot deserialize from Object value (no delegate- or property-based Creator)
at [Source: (io.netty.buffer.ByteBufInputStream); line: 1, column: 2]
2020-04-28 16:09:35.349 WARN 15136 --- [ctor-http-nio-2] io.netty.util.ReferenceCountUtil : Failed to release a message: DefaultLastHttpContent(data: PooledSlicedByteBuf(freed), decoderResult: success)
io.netty.util.IllegalReferenceCountException: refCnt: 0, decrement: 1
at io.netty.util.internal.ReferenceCountUpdater.toLiveRealRefCnt(ReferenceCountUpdater.java:74) ~[netty-common-4.1.45.Final.jar:4.1.45.Final]
我错过了什么?
所以我终于解决了这个问题。对于那些将要解决类似问题的人来说,这里就是答案。 问题在于 Spring Webflux 将 ResponseEntity 转换为 DefaultFullHttpResponse,因此 DefaultFullHttpResponse 包含 headers、status 以及 body。我通过完全相同的方法解决了这个问题。
public void runWithPost(final String endpointPath, final ServerCallback callback) {
if (server == null || server.isDisposed()) {
server = HttpServer.create()
.host(this.host)
.port(this.port)
.route(routes ->
routes.post(endpointPath, (request, response) -> processResponse(response, callback)))
.wiretap(true)
.bindNow();
logger.info("Starting server...");
} else {
logger.info("Couldn't start server because one is already running!");
}
}
转换就在这里
private NettyOutbound processResponse(final HttpServerResponse response, final ServerCallback callback) {
final ResponseEntity responseEntity = callback.handleCallback();
// set status
response.status(responseEntity.getStatusCodeValue());
// set headers
final HttpHeaders entityHeaders = responseEntity.getHeaders();
if (!entityHeaders.isEmpty()) {
entityHeaders.entrySet().stream()
.forEach(entry -> response.addHeader(entry.getKey(), buildValue(entry.getValue())));
}
if (responseEntity.hasBody()) {
try {
final Object body = responseEntity.getBody();
if (body instanceof String) {
return response.sendString(Mono.just((String) body));
} else {
return response.send(Mono.just(Unpooled.wrappedBuffer(getBytesFromObject(body))));
}
} catch (final IOException ex) {
response.status(HttpResponseStatus.INTERNAL_SERVER_ERROR);
return response.sendString(Mono.just(ex.getMessage()));
}
}
// set body
return response.send(Mono.empty());
}
用法如下:
mockReactiveRestServer.runWithPost("/transaction", () -> ResponseEntity.ok().build());