一个连接中的多个协议
Multiple Protocols in One Connection
我有一个非常奇怪的情况,一个 TCP 服务器基本上维护两个协议。第一个消息对发送 STX|PAYLOAD|ETX 消息和 returns 相同的 STX|PAYLOAD|ETX 消息类型。
另一个消息对发送 STX|PAYLOAD|ETX 消息但只发送 returns |PAYLOAD| (没有 STX/ETX 字节)。
我创建了一个自定义 Serializer/Deserializer 来处理异常,但我收到的错误是在收到连接时发生的(如下面的堆栈跟踪所示)。
出于测试目的在服务器上使用 CTRL 使我能够接收消息。所以,我认为正在发生的是 AbstractConnectionFactory 默认使用 ByteArrayCrLfSerializer。但是,当我尝试使用自定义序列化程序时,连接工厂不知道如何呈现没有 STX/ETX 字节的消息。我需要做的只是阅读回复。请指教...
@Bean
FailoverClientConnectionFactory failoverClientFactory() {
FailoverClientConnectionFactory failoverClientConnectionFactory = new FailoverClientConnectionFactory(underlyingCF());
failoverClientConnectionFactory.setSingleUse(true);
return failoverClientConnectionFactory;
}
@Bean
public List<AbstractClientConnectionFactory> underlyingCF() {
List<AbstractClientConnectionFactory> connections = new ArrayList<AbstractClientConnectionFactory>();
TcpNioClientConnectionFactory primary = new TcpNioClientConnectionFactory(primaryTcpServerHost, primaryTcpServerPort);
primary.setSerializer(new ByteArrayClientSerializer());
primary.setDeserializer(new ByteArrayClientSerializer());
primary.setSingleUse(true);
log.info("Starting with Primary Server/Port as: {}:{}", primaryTcpServerHost, primaryTcpServerPort);
TcpNioClientConnectionFactory failover = new TcpNioClientConnectionFactory(secondaryTcpServerHost, secondaryTcpServerPort);
failover.setSerializer(new ByteArrayClientSerializer());
primary.setDeserializer(new ByteArrayClientSerializer());
failover.setSingleUse(true);
log.info("Starting with Secondary Server/Port as: {}:{}", secondaryTcpServerHost, secondaryTcpServerPort);
connections.add(primary);
connections.add(failover);
return connections;
}
@Bean
@DependsOn("failoverClientFactory")
public IntegrationFlow liveMumClient() {
return IntegrationFlows.from(Gate.class)
.handle(Tcp.outboundGateway(failoverClientFactory()))
.transform(Transformers.objectToString())
.get();
}
public interface Gate {
// TODO: Use properties for 20000 seems to be unsupported
@Gateway(replyTimeout = 5000)
String sendAndReceive(byte[] out);
}
}
private Deserializer<?> deserializer = new ByteArrayCrLfSerializer();
private static final byte[] CRLF = "\r\n".getBytes();
@Override
public void serialize(byte[] bytes, OutputStream outputStream)
throws IOException {
outputStream.write(bytes);
outputStream.write(CRLF); // DOESN'T WORK WHEN REMOVED AND I NEED TO READ RAW BYTES IN THIS CASE.
}
客户端序列化程序
@Slf4j
public class ByteArrayClientSerializer extends AbstractPooledBufferByteArraySerializer {
/**
* A single reusable instance.
*/
public static final ByteArrayLiveMumSerializer INSTANCE = new ByteArrayLiveMumSerializer();
public static final int STX = 0x02;
public static final int ETX = 0x03;
/**
* Reads the data in the inputStream to a byte[]. Data must be prefixed
* with an ASCII STX character, and terminated with an ASCII ETX character.
* Throws a {@link SoftEndOfStreamException} if the stream
* is closed immediately before the STX (i.e. no data is in the process of
* being read).
*
*/
@Override
public byte[] doDeserialize(InputStream inputStream, byte[] buffer) throws IOException {
int bite = inputStream.read();
if (bite < 0) {
throw new SoftEndOfStreamException("Stream closed between payloads");
}
int n = 0;
try {
if (bite == STX) {
while ((bite = inputStream.read()) != ETX) {
checkClosure(bite);
buffer[n++] = (byte) bite;
if (n >= getMaxMessageSize()) {
throw new IOException("ETX not found before max message length: "
+ getMaxMessageSize());
}
}
}
else {
while (bite >= 0) {
try {
bite = inputStream.read();
}
catch (SocketTimeoutException e) {
bite = -1;
}
if (bite < 0) {
if (n == 0) {
throw new SoftEndOfStreamException("Stream closed between payloads");
}
break;
}
if (n >= getMaxMessageSize()) {
throw new IOException("Socket was not closed before max message length: "
+ getMaxMessageSize());
}
buffer[n++] = (byte) bite;
}
byte[] bytes = copyToSizedArray(buffer, n);
System.out.println("Deserialized: " + new String(bytes));
}
return copyToSizedArray(buffer, n);
}
catch (IOException e) {
publishEvent(e, buffer, n);
throw e;
}
catch (RuntimeException e) {
publishEvent(e, buffer, n);
throw e;
}
}
/**
* Writes the byte[] to the stream, prefixed by an ASCII STX character and
* terminated with an ASCII ETX character.
*/
@Override
public void serialize(byte[] bytes, OutputStream os) throws IOException {
os.write(STX);
os.write(bytes);
os.write(ETX);
}
}
org.springframework.messaging.MessagingException: Exception while awaiting reply; nested exception is java.io.EOFException: Connection is closed
at org.springframework.integration.ip.tcp.TcpOutboundGateway$AsyncReply.doThrowErrorMessagePayload(TcpOutboundGateway.java:419) ~[spring-integration-ip-5.2.1.RELEASE.jar:5.2.1.RELEASE]
at org.springframework.integration.ip.tcp.TcpOutboundGateway$AsyncReply.getReply(TcpOutboundGateway.java:408) ~[spring-integration-ip-5.2.1.RELEASE.jar:5.2.1.RELEASE]
at org.springframework.integration.ip.tcp.TcpOutboundGateway.getReply(TcpOutboundGateway.java:209) ~[spring-integration-ip-5.2.1.RELEASE.jar:5.2.1.RELEASE]
at org.springframework.integration.ip.tcp.TcpOutboundGateway.handleRequestMessage(TcpOutboundGateway.java:161) ~[spring-integration-ip-5.2.1.RELEASE.jar:5.2.1.RELEASE]
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:127) [spring-integration-core-5.2.1.RELEASE.jar:5.2.1.RELEASE]
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:170) [spring-integration-core-5.2.1.RELEASE.jar:5.2.1.RELEASE]
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115) [spring-integration-core-5.2.1.RELEASE.jar:5.2.1.RELEASE]
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133) [spring-integration-core-5.2.1.RELEASE.jar:5.2.1.RELEASE]
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106) [spring-integration-core-5.2.1.RELEASE.jar:5.2.1.RELEASE]
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73) [spring-integration-core-5.2.1.RELEASE.jar:5.2.1.RELEASE]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453) [spring-integration-core-5.2.1.RELEASE.jar:5.2.1.RELEASE]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:403) [spring-integration-core-5.2.1.RELEASE.jar:5.2.1.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187) [spring-messaging-5.2.1.RELEASE.jar:5.2.1.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSendAndReceive(GenericMessagingTemplate.java:233) [spring-messaging-5.2.1.RELEASE.jar:5.2.1.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSendAndReceive(GenericMessagingTemplate.java:47) [spring-messaging-5.2.1.RELEASE.jar:5.2.1.RELEASE]
at org.springframework.messaging.core.AbstractMessagingTemplate.sendAndReceive(AbstractMessagingTemplate.java:46) [spring-messaging-5.2.1.RELEASE.jar:5.2.1.RELEASE]
at org.springframework.integration.core.MessagingTemplate.sendAndReceive(MessagingTemplate.java:97) [spring-integration-core-5.2.1.RELEASE.jar:5.2.1.RELEASE]
at org.springframework.integration.core.MessagingTemplate.sendAndReceive(MessagingTemplate.java:38) [spring-integration-core-5.2.1.RELEASE.jar:5.2.1.RELEASE]
at org.springframework.messaging.core.AbstractMessagingTemplate.convertSendAndReceive(AbstractMessagingTemplate.java:96) [spring-messaging-5.2.1.RELEASE.jar:5.2.1.RELEASE]
at org.springframework.messaging.core.AbstractMessagingTemplate.convertSendAndReceive(AbstractMessagingTemplate.java:86) [spring-messaging-5.2.1.RELEASE.jar:5.2.1.RELEASE]
at org.springframework.integration.gateway.MessagingGatewaySupport.doSendAndReceive(MessagingGatewaySupport.java:495) [spring-integration-core-5.2.1.RELEASE.jar:5.2.1.RELEASE]
at org.springframework.integration.gateway.MessagingGatewaySupport.sendAndReceive(MessagingGatewaySupport.java:469) [spring-integration-core-5.2.1.RELEASE.jar:5.2.1.RELEASE]
at org.springframework.integration.gateway.GatewayProxyFactoryBean.sendOrSendAndReceive(GatewayProxyFactoryBean.java:564) [spring-integration-core-5.2.1.RELEASE.jar:5.2.1.RELEASE]
at org.springframework.integration.gateway.GatewayProxyFactoryBean.invokeGatewayMethod(GatewayProxyFactoryBean.java:489) [spring-integration-core-5.2.1.RELEASE.jar:5.2.1.RELEASE]
at org.springframework.integration.gateway.GatewayProxyFactoryBean.doInvoke(GatewayProxyFactoryBean.java:464) [spring-integration-core-5.2.1.RELEASE.jar:5.2.1.RELEASE]
at org.springframework.integration.gateway.GatewayProxyFactoryBean.invoke(GatewayProxyFactoryBean.java:453) [spring-integration-core-5.2.1.RELEASE.jar:5.2.1.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) [spring-aop-5.2.1.RELEASE.jar:5.2.1.RELEASE]
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:212) [spring-aop-5.2.1.RELEASE.jar:5.2.1.RELEASE]
at com.sun.proxy.$Proxy74.sendAndReceive(Unknown Source) [na:na]
at gov.nyc.mumweb.service.MumWebService.askMum(MumWebService.java:60) [classes/:na]
at gov.nyc.mumweb.controller.LiveMumController.recommendationService(LiveMumController.java:27) [classes/:na]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_141]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_141]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_141]
at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_141]
at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:190) [spring-web-5.2.1.RELEASE.jar:5.2.1.RELEASE]
at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:138) [spring-web-5.2.1.RELEASE.jar:5.2.1.RELEASE]
at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:106) [spring-webmvc-5.2.1.RELEASE.jar:5.2.1.RELEASE]
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:888) [spring-webmvc-5.2.1.RELEASE.jar:5.2.1.RELEASE]
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:793) [spring-webmvc-5.2.1.RELEASE.jar:5.2.1.RELEASE]
at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87) [spring-webmvc-5.2.1.RELEASE.jar:5.2.1.RELEASE]
at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1040) [spring-webmvc-5.2.1.RELEASE.jar:5.2.1.RELEASE]
at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:943) [spring-webmvc-5.2.1.RELEASE.jar:5.2.1.RELEASE]
at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1006) [spring-webmvc-5.2.1.RELEASE.jar:5.2.1.RELEASE]
at org.springframework.web.servlet.FrameworkServlet.doPost(FrameworkServlet.java:909) [spring-webmvc-5.2.1.RELEASE.jar:5.2.1.RELEASE]
at javax.servlet.http.HttpServlet.service(HttpServlet.java:660) [tomcat-embed-core-9.0.27.jar:9.0.27]
at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:883) [spring-webmvc-5.2.1.RELEASE.jar:5.2.1.RELEASE]
at javax.servlet.http.HttpServlet.service(HttpServlet.java:741) [tomcat-embed-core-9.0.27.jar:9.0.27]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:231) [tomcat-embed-core-9.0.27.jar:9.0.27]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) [tomcat-embed-core-9.0.27.jar:9.0.27]
at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:53) [tomcat-embed-websocket-9.0.27.jar:9.0.27]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) [tomcat-embed-core-9.0.27.jar:9.0.27]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) [tomcat-embed-core-9.0.27.jar:9.0.27]
at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:100) [spring-web-5.2.1.RELEASE.jar:5.2.1.RELEASE]
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) [spring-web-5.2.1.RELEASE.jar:5.2.1.RELEASE]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) [tomcat-embed-core-9.0.27.jar:9.0.27]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) [tomcat-embed-core-9.0.27.jar:9.0.27]
at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:93) [spring-web-5.2.1.RELEASE.jar:5.2.1.RELEASE]
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) [spring-web-5.2.1.RELEASE.jar:5.2.1.RELEASE]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) [tomcat-embed-core-9.0.27.jar:9.0.27]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) [tomcat-embed-core-9.0.27.jar:9.0.27]
at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:201) [spring-web-5.2.1.RELEASE.jar:5.2.1.RELEASE]
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) [spring-web-5.2.1.RELEASE.jar:5.2.1.RELEASE]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) [tomcat-embed-core-9.0.27.jar:9.0.27]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) [tomcat-embed-core-9.0.27.jar:9.0.27]
at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:202) [tomcat-embed-core-9.0.27.jar:9.0.27]
at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:96) [tomcat-embed-core-9.0.27.jar:9.0.27]
at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:526) [tomcat-embed-core-9.0.27.jar:9.0.27]
at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:139) [tomcat-embed-core-9.0.27.jar:9.0.27]
at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:92) [tomcat-embed-core-9.0.27.jar:9.0.27]
at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:74) [tomcat-embed-core-9.0.27.jar:9.0.27]
at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:343) [tomcat-embed-core-9.0.27.jar:9.0.27]
at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:408) [tomcat-embed-core-9.0.27.jar:9.0.27]
at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:66) [tomcat-embed-core-9.0.27.jar:9.0.27]
at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:861) [tomcat-embed-core-9.0.27.jar:9.0.27]
at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1579) [tomcat-embed-core-9.0.27.jar:9.0.27]
at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49) [tomcat-embed-core-9.0.27.jar:9.0.27]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_141]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_141]
at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) [tomcat-embed-core-9.0.27.jar:9.0.27]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_141]
Caused by: java.io.EOFException: Connection is closed
at org.springframework.integration.ip.tcp.connection.AbstractConnectionFactory.lambda$processNioSelections[=14=](AbstractConnectionFactory.java:722) ~[spring-integration-ip-5.2.1.RELEASE.jar:5.2.1.RELEASE]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_141]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_141]
... 1 common frames omitted
删除调试消息消除了错误。呃!
System.out.println("Deserialized: " + new String(bytes));
我有一个非常奇怪的情况,一个 TCP 服务器基本上维护两个协议。第一个消息对发送 STX|PAYLOAD|ETX 消息和 returns 相同的 STX|PAYLOAD|ETX 消息类型。
另一个消息对发送 STX|PAYLOAD|ETX 消息但只发送 returns |PAYLOAD| (没有 STX/ETX 字节)。
我创建了一个自定义 Serializer/Deserializer 来处理异常,但我收到的错误是在收到连接时发生的(如下面的堆栈跟踪所示)。
出于测试目的在服务器上使用 CTRL 使我能够接收消息。所以,我认为正在发生的是 AbstractConnectionFactory 默认使用 ByteArrayCrLfSerializer。但是,当我尝试使用自定义序列化程序时,连接工厂不知道如何呈现没有 STX/ETX 字节的消息。我需要做的只是阅读回复。请指教...
@Bean
FailoverClientConnectionFactory failoverClientFactory() {
FailoverClientConnectionFactory failoverClientConnectionFactory = new FailoverClientConnectionFactory(underlyingCF());
failoverClientConnectionFactory.setSingleUse(true);
return failoverClientConnectionFactory;
}
@Bean
public List<AbstractClientConnectionFactory> underlyingCF() {
List<AbstractClientConnectionFactory> connections = new ArrayList<AbstractClientConnectionFactory>();
TcpNioClientConnectionFactory primary = new TcpNioClientConnectionFactory(primaryTcpServerHost, primaryTcpServerPort);
primary.setSerializer(new ByteArrayClientSerializer());
primary.setDeserializer(new ByteArrayClientSerializer());
primary.setSingleUse(true);
log.info("Starting with Primary Server/Port as: {}:{}", primaryTcpServerHost, primaryTcpServerPort);
TcpNioClientConnectionFactory failover = new TcpNioClientConnectionFactory(secondaryTcpServerHost, secondaryTcpServerPort);
failover.setSerializer(new ByteArrayClientSerializer());
primary.setDeserializer(new ByteArrayClientSerializer());
failover.setSingleUse(true);
log.info("Starting with Secondary Server/Port as: {}:{}", secondaryTcpServerHost, secondaryTcpServerPort);
connections.add(primary);
connections.add(failover);
return connections;
}
@Bean
@DependsOn("failoverClientFactory")
public IntegrationFlow liveMumClient() {
return IntegrationFlows.from(Gate.class)
.handle(Tcp.outboundGateway(failoverClientFactory()))
.transform(Transformers.objectToString())
.get();
}
public interface Gate {
// TODO: Use properties for 20000 seems to be unsupported
@Gateway(replyTimeout = 5000)
String sendAndReceive(byte[] out);
}
}
private Deserializer<?> deserializer = new ByteArrayCrLfSerializer();
private static final byte[] CRLF = "\r\n".getBytes();
@Override
public void serialize(byte[] bytes, OutputStream outputStream)
throws IOException {
outputStream.write(bytes);
outputStream.write(CRLF); // DOESN'T WORK WHEN REMOVED AND I NEED TO READ RAW BYTES IN THIS CASE.
}
客户端序列化程序
@Slf4j
public class ByteArrayClientSerializer extends AbstractPooledBufferByteArraySerializer {
/**
* A single reusable instance.
*/
public static final ByteArrayLiveMumSerializer INSTANCE = new ByteArrayLiveMumSerializer();
public static final int STX = 0x02;
public static final int ETX = 0x03;
/**
* Reads the data in the inputStream to a byte[]. Data must be prefixed
* with an ASCII STX character, and terminated with an ASCII ETX character.
* Throws a {@link SoftEndOfStreamException} if the stream
* is closed immediately before the STX (i.e. no data is in the process of
* being read).
*
*/
@Override
public byte[] doDeserialize(InputStream inputStream, byte[] buffer) throws IOException {
int bite = inputStream.read();
if (bite < 0) {
throw new SoftEndOfStreamException("Stream closed between payloads");
}
int n = 0;
try {
if (bite == STX) {
while ((bite = inputStream.read()) != ETX) {
checkClosure(bite);
buffer[n++] = (byte) bite;
if (n >= getMaxMessageSize()) {
throw new IOException("ETX not found before max message length: "
+ getMaxMessageSize());
}
}
}
else {
while (bite >= 0) {
try {
bite = inputStream.read();
}
catch (SocketTimeoutException e) {
bite = -1;
}
if (bite < 0) {
if (n == 0) {
throw new SoftEndOfStreamException("Stream closed between payloads");
}
break;
}
if (n >= getMaxMessageSize()) {
throw new IOException("Socket was not closed before max message length: "
+ getMaxMessageSize());
}
buffer[n++] = (byte) bite;
}
byte[] bytes = copyToSizedArray(buffer, n);
System.out.println("Deserialized: " + new String(bytes));
}
return copyToSizedArray(buffer, n);
}
catch (IOException e) {
publishEvent(e, buffer, n);
throw e;
}
catch (RuntimeException e) {
publishEvent(e, buffer, n);
throw e;
}
}
/**
* Writes the byte[] to the stream, prefixed by an ASCII STX character and
* terminated with an ASCII ETX character.
*/
@Override
public void serialize(byte[] bytes, OutputStream os) throws IOException {
os.write(STX);
os.write(bytes);
os.write(ETX);
}
}
org.springframework.messaging.MessagingException: Exception while awaiting reply; nested exception is java.io.EOFException: Connection is closed
at org.springframework.integration.ip.tcp.TcpOutboundGateway$AsyncReply.doThrowErrorMessagePayload(TcpOutboundGateway.java:419) ~[spring-integration-ip-5.2.1.RELEASE.jar:5.2.1.RELEASE]
at org.springframework.integration.ip.tcp.TcpOutboundGateway$AsyncReply.getReply(TcpOutboundGateway.java:408) ~[spring-integration-ip-5.2.1.RELEASE.jar:5.2.1.RELEASE]
at org.springframework.integration.ip.tcp.TcpOutboundGateway.getReply(TcpOutboundGateway.java:209) ~[spring-integration-ip-5.2.1.RELEASE.jar:5.2.1.RELEASE]
at org.springframework.integration.ip.tcp.TcpOutboundGateway.handleRequestMessage(TcpOutboundGateway.java:161) ~[spring-integration-ip-5.2.1.RELEASE.jar:5.2.1.RELEASE]
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:127) [spring-integration-core-5.2.1.RELEASE.jar:5.2.1.RELEASE]
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:170) [spring-integration-core-5.2.1.RELEASE.jar:5.2.1.RELEASE]
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115) [spring-integration-core-5.2.1.RELEASE.jar:5.2.1.RELEASE]
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133) [spring-integration-core-5.2.1.RELEASE.jar:5.2.1.RELEASE]
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106) [spring-integration-core-5.2.1.RELEASE.jar:5.2.1.RELEASE]
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73) [spring-integration-core-5.2.1.RELEASE.jar:5.2.1.RELEASE]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453) [spring-integration-core-5.2.1.RELEASE.jar:5.2.1.RELEASE]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:403) [spring-integration-core-5.2.1.RELEASE.jar:5.2.1.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187) [spring-messaging-5.2.1.RELEASE.jar:5.2.1.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSendAndReceive(GenericMessagingTemplate.java:233) [spring-messaging-5.2.1.RELEASE.jar:5.2.1.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSendAndReceive(GenericMessagingTemplate.java:47) [spring-messaging-5.2.1.RELEASE.jar:5.2.1.RELEASE]
at org.springframework.messaging.core.AbstractMessagingTemplate.sendAndReceive(AbstractMessagingTemplate.java:46) [spring-messaging-5.2.1.RELEASE.jar:5.2.1.RELEASE]
at org.springframework.integration.core.MessagingTemplate.sendAndReceive(MessagingTemplate.java:97) [spring-integration-core-5.2.1.RELEASE.jar:5.2.1.RELEASE]
at org.springframework.integration.core.MessagingTemplate.sendAndReceive(MessagingTemplate.java:38) [spring-integration-core-5.2.1.RELEASE.jar:5.2.1.RELEASE]
at org.springframework.messaging.core.AbstractMessagingTemplate.convertSendAndReceive(AbstractMessagingTemplate.java:96) [spring-messaging-5.2.1.RELEASE.jar:5.2.1.RELEASE]
at org.springframework.messaging.core.AbstractMessagingTemplate.convertSendAndReceive(AbstractMessagingTemplate.java:86) [spring-messaging-5.2.1.RELEASE.jar:5.2.1.RELEASE]
at org.springframework.integration.gateway.MessagingGatewaySupport.doSendAndReceive(MessagingGatewaySupport.java:495) [spring-integration-core-5.2.1.RELEASE.jar:5.2.1.RELEASE]
at org.springframework.integration.gateway.MessagingGatewaySupport.sendAndReceive(MessagingGatewaySupport.java:469) [spring-integration-core-5.2.1.RELEASE.jar:5.2.1.RELEASE]
at org.springframework.integration.gateway.GatewayProxyFactoryBean.sendOrSendAndReceive(GatewayProxyFactoryBean.java:564) [spring-integration-core-5.2.1.RELEASE.jar:5.2.1.RELEASE]
at org.springframework.integration.gateway.GatewayProxyFactoryBean.invokeGatewayMethod(GatewayProxyFactoryBean.java:489) [spring-integration-core-5.2.1.RELEASE.jar:5.2.1.RELEASE]
at org.springframework.integration.gateway.GatewayProxyFactoryBean.doInvoke(GatewayProxyFactoryBean.java:464) [spring-integration-core-5.2.1.RELEASE.jar:5.2.1.RELEASE]
at org.springframework.integration.gateway.GatewayProxyFactoryBean.invoke(GatewayProxyFactoryBean.java:453) [spring-integration-core-5.2.1.RELEASE.jar:5.2.1.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) [spring-aop-5.2.1.RELEASE.jar:5.2.1.RELEASE]
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:212) [spring-aop-5.2.1.RELEASE.jar:5.2.1.RELEASE]
at com.sun.proxy.$Proxy74.sendAndReceive(Unknown Source) [na:na]
at gov.nyc.mumweb.service.MumWebService.askMum(MumWebService.java:60) [classes/:na]
at gov.nyc.mumweb.controller.LiveMumController.recommendationService(LiveMumController.java:27) [classes/:na]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_141]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_141]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_141]
at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_141]
at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:190) [spring-web-5.2.1.RELEASE.jar:5.2.1.RELEASE]
at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:138) [spring-web-5.2.1.RELEASE.jar:5.2.1.RELEASE]
at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:106) [spring-webmvc-5.2.1.RELEASE.jar:5.2.1.RELEASE]
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:888) [spring-webmvc-5.2.1.RELEASE.jar:5.2.1.RELEASE]
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:793) [spring-webmvc-5.2.1.RELEASE.jar:5.2.1.RELEASE]
at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87) [spring-webmvc-5.2.1.RELEASE.jar:5.2.1.RELEASE]
at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1040) [spring-webmvc-5.2.1.RELEASE.jar:5.2.1.RELEASE]
at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:943) [spring-webmvc-5.2.1.RELEASE.jar:5.2.1.RELEASE]
at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1006) [spring-webmvc-5.2.1.RELEASE.jar:5.2.1.RELEASE]
at org.springframework.web.servlet.FrameworkServlet.doPost(FrameworkServlet.java:909) [spring-webmvc-5.2.1.RELEASE.jar:5.2.1.RELEASE]
at javax.servlet.http.HttpServlet.service(HttpServlet.java:660) [tomcat-embed-core-9.0.27.jar:9.0.27]
at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:883) [spring-webmvc-5.2.1.RELEASE.jar:5.2.1.RELEASE]
at javax.servlet.http.HttpServlet.service(HttpServlet.java:741) [tomcat-embed-core-9.0.27.jar:9.0.27]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:231) [tomcat-embed-core-9.0.27.jar:9.0.27]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) [tomcat-embed-core-9.0.27.jar:9.0.27]
at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:53) [tomcat-embed-websocket-9.0.27.jar:9.0.27]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) [tomcat-embed-core-9.0.27.jar:9.0.27]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) [tomcat-embed-core-9.0.27.jar:9.0.27]
at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:100) [spring-web-5.2.1.RELEASE.jar:5.2.1.RELEASE]
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) [spring-web-5.2.1.RELEASE.jar:5.2.1.RELEASE]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) [tomcat-embed-core-9.0.27.jar:9.0.27]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) [tomcat-embed-core-9.0.27.jar:9.0.27]
at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:93) [spring-web-5.2.1.RELEASE.jar:5.2.1.RELEASE]
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) [spring-web-5.2.1.RELEASE.jar:5.2.1.RELEASE]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) [tomcat-embed-core-9.0.27.jar:9.0.27]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) [tomcat-embed-core-9.0.27.jar:9.0.27]
at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:201) [spring-web-5.2.1.RELEASE.jar:5.2.1.RELEASE]
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) [spring-web-5.2.1.RELEASE.jar:5.2.1.RELEASE]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) [tomcat-embed-core-9.0.27.jar:9.0.27]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) [tomcat-embed-core-9.0.27.jar:9.0.27]
at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:202) [tomcat-embed-core-9.0.27.jar:9.0.27]
at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:96) [tomcat-embed-core-9.0.27.jar:9.0.27]
at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:526) [tomcat-embed-core-9.0.27.jar:9.0.27]
at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:139) [tomcat-embed-core-9.0.27.jar:9.0.27]
at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:92) [tomcat-embed-core-9.0.27.jar:9.0.27]
at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:74) [tomcat-embed-core-9.0.27.jar:9.0.27]
at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:343) [tomcat-embed-core-9.0.27.jar:9.0.27]
at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:408) [tomcat-embed-core-9.0.27.jar:9.0.27]
at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:66) [tomcat-embed-core-9.0.27.jar:9.0.27]
at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:861) [tomcat-embed-core-9.0.27.jar:9.0.27]
at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1579) [tomcat-embed-core-9.0.27.jar:9.0.27]
at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49) [tomcat-embed-core-9.0.27.jar:9.0.27]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_141]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_141]
at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) [tomcat-embed-core-9.0.27.jar:9.0.27]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_141]
Caused by: java.io.EOFException: Connection is closed
at org.springframework.integration.ip.tcp.connection.AbstractConnectionFactory.lambda$processNioSelections[=14=](AbstractConnectionFactory.java:722) ~[spring-integration-ip-5.2.1.RELEASE.jar:5.2.1.RELEASE]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_141]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_141]
... 1 common frames omitted
删除调试消息消除了错误。呃!
System.out.println("Deserialized: " + new String(bytes));