一个连接中的多个协议

Multiple Protocols in One Connection

我有一个非常奇怪的情况,一个 TCP 服务器基本上维护两个协议。第一个消息对发送 STX|PAYLOAD|ETX 消息和 returns 相同的 STX|PAYLOAD|ETX 消息类型。

另一个消息对发送 STX|PAYLOAD|ETX 消息但只发送 returns |PAYLOAD| (没有 STX/ETX 字节)。

我创建了一个自定义 Serializer/Deserializer 来处理异常,但我收到的错误是在收到连接时发生的(如下面的堆栈跟踪所示)。

出于测试目的在服务器上使用 CTRL 使我能够接收消息。所以,我认为正在发生的是 AbstractConnectionFactory 默认使用 ByteArrayCrLfSerializer。但是,当我尝试使用自定义序列化程序时,连接工厂不知道如何呈现没有 STX/ETX 字节的消息。我需要做的只是阅读回复。请指教...

    @Bean
    FailoverClientConnectionFactory failoverClientFactory() {
        FailoverClientConnectionFactory failoverClientConnectionFactory = new FailoverClientConnectionFactory(underlyingCF());
        failoverClientConnectionFactory.setSingleUse(true);
        return failoverClientConnectionFactory;
    }

    @Bean
    public List<AbstractClientConnectionFactory> underlyingCF() {

        List<AbstractClientConnectionFactory> connections = new ArrayList<AbstractClientConnectionFactory>();
        TcpNioClientConnectionFactory primary = new TcpNioClientConnectionFactory(primaryTcpServerHost, primaryTcpServerPort);
        primary.setSerializer(new ByteArrayClientSerializer());
        primary.setDeserializer(new ByteArrayClientSerializer());
        primary.setSingleUse(true);
        log.info("Starting with Primary Server/Port as: {}:{}", primaryTcpServerHost, primaryTcpServerPort);

        TcpNioClientConnectionFactory failover = new TcpNioClientConnectionFactory(secondaryTcpServerHost, secondaryTcpServerPort);
        failover.setSerializer(new ByteArrayClientSerializer());
        primary.setDeserializer(new ByteArrayClientSerializer());
        failover.setSingleUse(true);
        log.info("Starting with Secondary Server/Port as: {}:{}", secondaryTcpServerHost, secondaryTcpServerPort);

        connections.add(primary);
        connections.add(failover);

        return connections;
    }

    @Bean
    @DependsOn("failoverClientFactory")
    public IntegrationFlow liveMumClient() {
        return IntegrationFlows.from(Gate.class)
                .handle(Tcp.outboundGateway(failoverClientFactory()))
                .transform(Transformers.objectToString())
                .get();
    }

    public interface Gate {
        // TODO: Use properties for 20000 seems to be unsupported 
        @Gateway(replyTimeout = 5000)
        String sendAndReceive(byte[] out);
    }
}

private Deserializer<?> deserializer = new ByteArrayCrLfSerializer();
    private static final byte[] CRLF = "\r\n".getBytes();

    @Override
    public void serialize(byte[] bytes, OutputStream outputStream)
            throws IOException {
        outputStream.write(bytes);
        outputStream.write(CRLF); // DOESN'T WORK WHEN REMOVED AND I NEED TO READ RAW BYTES IN THIS CASE.
    }

客户端序列化程序

@Slf4j
public class ByteArrayClientSerializer extends AbstractPooledBufferByteArraySerializer {

    /**
     * A single reusable instance.
     */
    public static final ByteArrayLiveMumSerializer INSTANCE = new ByteArrayLiveMumSerializer();

    public static final int STX = 0x02;

    public static final int ETX = 0x03;

    /**
     * Reads the data in the inputStream to a byte[]. Data must be prefixed
     * with an ASCII STX character, and terminated with an ASCII ETX character.
     * Throws a {@link SoftEndOfStreamException} if the stream
     * is closed immediately before the STX (i.e. no data is in the process of
     * being read).
     *
     */
    @Override
    public byte[] doDeserialize(InputStream inputStream, byte[] buffer) throws IOException {

        int bite = inputStream.read();
        if (bite < 0) {
            throw new SoftEndOfStreamException("Stream closed between payloads");
        }
        int n = 0;

        try {
            if (bite == STX) {
                while ((bite = inputStream.read()) != ETX) {
                    checkClosure(bite);
                    buffer[n++] = (byte) bite;
                    if (n >= getMaxMessageSize()) {
                        throw new IOException("ETX not found before max message length: "
                                + getMaxMessageSize());
                    }
                }
            }
            else {
                while (bite >= 0) {
                    try {
                        bite = inputStream.read();
                    }
                    catch (SocketTimeoutException e) {
                        bite = -1;
                    }
                    if (bite < 0) {
                        if (n == 0) {
                            throw new SoftEndOfStreamException("Stream closed between payloads");
                        }
                        break;
                    }
                    if (n >= getMaxMessageSize()) {
                        throw new IOException("Socket was not closed before max message length: "
                                + getMaxMessageSize());
                    }
                    buffer[n++] = (byte) bite;
                }
                byte[] bytes = copyToSizedArray(buffer, n);
                System.out.println("Deserialized: " + new String(bytes));
            }
            return copyToSizedArray(buffer, n);
        }
        catch (IOException e) {
            publishEvent(e, buffer, n);
            throw e;
        }
        catch (RuntimeException e) {
            publishEvent(e, buffer, n);
            throw e;
        }
    }

    /**
     * Writes the byte[] to the stream, prefixed by an ASCII STX character and
     * terminated with an ASCII ETX character.
     */
    @Override
    public void serialize(byte[] bytes, OutputStream os) throws IOException {

        os.write(STX);
        os.write(bytes);
        os.write(ETX);
    }
}





org.springframework.messaging.MessagingException: Exception while awaiting reply; nested exception is java.io.EOFException: Connection is closed
    at org.springframework.integration.ip.tcp.TcpOutboundGateway$AsyncReply.doThrowErrorMessagePayload(TcpOutboundGateway.java:419) ~[spring-integration-ip-5.2.1.RELEASE.jar:5.2.1.RELEASE]
    at org.springframework.integration.ip.tcp.TcpOutboundGateway$AsyncReply.getReply(TcpOutboundGateway.java:408) ~[spring-integration-ip-5.2.1.RELEASE.jar:5.2.1.RELEASE]
    at org.springframework.integration.ip.tcp.TcpOutboundGateway.getReply(TcpOutboundGateway.java:209) ~[spring-integration-ip-5.2.1.RELEASE.jar:5.2.1.RELEASE]
    at org.springframework.integration.ip.tcp.TcpOutboundGateway.handleRequestMessage(TcpOutboundGateway.java:161) ~[spring-integration-ip-5.2.1.RELEASE.jar:5.2.1.RELEASE]
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:127) [spring-integration-core-5.2.1.RELEASE.jar:5.2.1.RELEASE]
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:170) [spring-integration-core-5.2.1.RELEASE.jar:5.2.1.RELEASE]
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115) [spring-integration-core-5.2.1.RELEASE.jar:5.2.1.RELEASE]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133) [spring-integration-core-5.2.1.RELEASE.jar:5.2.1.RELEASE]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106) [spring-integration-core-5.2.1.RELEASE.jar:5.2.1.RELEASE]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73) [spring-integration-core-5.2.1.RELEASE.jar:5.2.1.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453) [spring-integration-core-5.2.1.RELEASE.jar:5.2.1.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:403) [spring-integration-core-5.2.1.RELEASE.jar:5.2.1.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187) [spring-messaging-5.2.1.RELEASE.jar:5.2.1.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSendAndReceive(GenericMessagingTemplate.java:233) [spring-messaging-5.2.1.RELEASE.jar:5.2.1.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSendAndReceive(GenericMessagingTemplate.java:47) [spring-messaging-5.2.1.RELEASE.jar:5.2.1.RELEASE]
    at org.springframework.messaging.core.AbstractMessagingTemplate.sendAndReceive(AbstractMessagingTemplate.java:46) [spring-messaging-5.2.1.RELEASE.jar:5.2.1.RELEASE]
    at org.springframework.integration.core.MessagingTemplate.sendAndReceive(MessagingTemplate.java:97) [spring-integration-core-5.2.1.RELEASE.jar:5.2.1.RELEASE]
    at org.springframework.integration.core.MessagingTemplate.sendAndReceive(MessagingTemplate.java:38) [spring-integration-core-5.2.1.RELEASE.jar:5.2.1.RELEASE]
    at org.springframework.messaging.core.AbstractMessagingTemplate.convertSendAndReceive(AbstractMessagingTemplate.java:96) [spring-messaging-5.2.1.RELEASE.jar:5.2.1.RELEASE]
    at org.springframework.messaging.core.AbstractMessagingTemplate.convertSendAndReceive(AbstractMessagingTemplate.java:86) [spring-messaging-5.2.1.RELEASE.jar:5.2.1.RELEASE]
    at org.springframework.integration.gateway.MessagingGatewaySupport.doSendAndReceive(MessagingGatewaySupport.java:495) [spring-integration-core-5.2.1.RELEASE.jar:5.2.1.RELEASE]
    at org.springframework.integration.gateway.MessagingGatewaySupport.sendAndReceive(MessagingGatewaySupport.java:469) [spring-integration-core-5.2.1.RELEASE.jar:5.2.1.RELEASE]
    at org.springframework.integration.gateway.GatewayProxyFactoryBean.sendOrSendAndReceive(GatewayProxyFactoryBean.java:564) [spring-integration-core-5.2.1.RELEASE.jar:5.2.1.RELEASE]
    at org.springframework.integration.gateway.GatewayProxyFactoryBean.invokeGatewayMethod(GatewayProxyFactoryBean.java:489) [spring-integration-core-5.2.1.RELEASE.jar:5.2.1.RELEASE]
    at org.springframework.integration.gateway.GatewayProxyFactoryBean.doInvoke(GatewayProxyFactoryBean.java:464) [spring-integration-core-5.2.1.RELEASE.jar:5.2.1.RELEASE]
    at org.springframework.integration.gateway.GatewayProxyFactoryBean.invoke(GatewayProxyFactoryBean.java:453) [spring-integration-core-5.2.1.RELEASE.jar:5.2.1.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) [spring-aop-5.2.1.RELEASE.jar:5.2.1.RELEASE]
    at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:212) [spring-aop-5.2.1.RELEASE.jar:5.2.1.RELEASE]
    at com.sun.proxy.$Proxy74.sendAndReceive(Unknown Source) [na:na]
    at gov.nyc.mumweb.service.MumWebService.askMum(MumWebService.java:60) [classes/:na]
    at gov.nyc.mumweb.controller.LiveMumController.recommendationService(LiveMumController.java:27) [classes/:na]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_141]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_141]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_141]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_141]
    at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:190) [spring-web-5.2.1.RELEASE.jar:5.2.1.RELEASE]
    at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:138) [spring-web-5.2.1.RELEASE.jar:5.2.1.RELEASE]
    at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:106) [spring-webmvc-5.2.1.RELEASE.jar:5.2.1.RELEASE]
    at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:888) [spring-webmvc-5.2.1.RELEASE.jar:5.2.1.RELEASE]
    at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:793) [spring-webmvc-5.2.1.RELEASE.jar:5.2.1.RELEASE]
    at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87) [spring-webmvc-5.2.1.RELEASE.jar:5.2.1.RELEASE]
    at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1040) [spring-webmvc-5.2.1.RELEASE.jar:5.2.1.RELEASE]
    at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:943) [spring-webmvc-5.2.1.RELEASE.jar:5.2.1.RELEASE]
    at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1006) [spring-webmvc-5.2.1.RELEASE.jar:5.2.1.RELEASE]
    at org.springframework.web.servlet.FrameworkServlet.doPost(FrameworkServlet.java:909) [spring-webmvc-5.2.1.RELEASE.jar:5.2.1.RELEASE]
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:660) [tomcat-embed-core-9.0.27.jar:9.0.27]
    at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:883) [spring-webmvc-5.2.1.RELEASE.jar:5.2.1.RELEASE]
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:741) [tomcat-embed-core-9.0.27.jar:9.0.27]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:231) [tomcat-embed-core-9.0.27.jar:9.0.27]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) [tomcat-embed-core-9.0.27.jar:9.0.27]
    at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:53) [tomcat-embed-websocket-9.0.27.jar:9.0.27]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) [tomcat-embed-core-9.0.27.jar:9.0.27]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) [tomcat-embed-core-9.0.27.jar:9.0.27]
    at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:100) [spring-web-5.2.1.RELEASE.jar:5.2.1.RELEASE]
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) [spring-web-5.2.1.RELEASE.jar:5.2.1.RELEASE]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) [tomcat-embed-core-9.0.27.jar:9.0.27]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) [tomcat-embed-core-9.0.27.jar:9.0.27]
    at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:93) [spring-web-5.2.1.RELEASE.jar:5.2.1.RELEASE]
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) [spring-web-5.2.1.RELEASE.jar:5.2.1.RELEASE]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) [tomcat-embed-core-9.0.27.jar:9.0.27]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) [tomcat-embed-core-9.0.27.jar:9.0.27]
    at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:201) [spring-web-5.2.1.RELEASE.jar:5.2.1.RELEASE]
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) [spring-web-5.2.1.RELEASE.jar:5.2.1.RELEASE]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) [tomcat-embed-core-9.0.27.jar:9.0.27]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) [tomcat-embed-core-9.0.27.jar:9.0.27]
    at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:202) [tomcat-embed-core-9.0.27.jar:9.0.27]
    at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:96) [tomcat-embed-core-9.0.27.jar:9.0.27]
    at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:526) [tomcat-embed-core-9.0.27.jar:9.0.27]
    at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:139) [tomcat-embed-core-9.0.27.jar:9.0.27]
    at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:92) [tomcat-embed-core-9.0.27.jar:9.0.27]
    at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:74) [tomcat-embed-core-9.0.27.jar:9.0.27]
    at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:343) [tomcat-embed-core-9.0.27.jar:9.0.27]
    at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:408) [tomcat-embed-core-9.0.27.jar:9.0.27]
    at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:66) [tomcat-embed-core-9.0.27.jar:9.0.27]
    at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:861) [tomcat-embed-core-9.0.27.jar:9.0.27]
    at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1579) [tomcat-embed-core-9.0.27.jar:9.0.27]
    at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49) [tomcat-embed-core-9.0.27.jar:9.0.27]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_141]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_141]
    at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) [tomcat-embed-core-9.0.27.jar:9.0.27]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_141]
Caused by: java.io.EOFException: Connection is closed
    at org.springframework.integration.ip.tcp.connection.AbstractConnectionFactory.lambda$processNioSelections[=14=](AbstractConnectionFactory.java:722) ~[spring-integration-ip-5.2.1.RELEASE.jar:5.2.1.RELEASE]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_141]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_141]
    ... 1 common frames omitted

删除调试消息消除了错误。呃!

System.out.println("Deserialized: " + new String(bytes));