如何在 spring 启动时创建工作 TCP 服务器套接字以及如何处理传入消息?

How to create a working TCP Server socket in spring boot and how to handle the incoming message?

我试图在现有的 spring 启动应用程序中实现一个带有 spring 集成的 TCP 服务器套接字,但我遇到了一个问题,这个问题让我发疯... 客户端正在向服务器发送消息(字节数组)并超时。而已。 我没有从服务器收到任何异常。看来我提供了错误的端口或其他东西,但在检查端口后,我确定它是正确的。

这是我基于注解的配置class:

import home.brew.server.socket.ServerSocketHandler;
import lombok.extern.log4j.Log4j2;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.ip.dsl.Tcp;

@Log4j2
@Configuration
@EnableIntegration
public class TcpServerSocketConfiguration {

    @Value("${socket.port}")
    private int serverSocketPort;

    @Bean
    public IntegrationFlow server(ServerSocketHandler serverSocketHandler) {
        TcpServerConnectionFactorySpec connectionFactory = 
            Tcp.netServer(socketPort) 
              .deserializer(new CustomSerializerDeserializer())
              .serializer(new CustomSerializerDeserializer())
              .soTcpNoDelay(true);

        TcpInboundGatewaySpec inboundGateway = 
           Tcp.inboundGateway(connectionFactory);

        return IntegrationFlows
         .from(inboundGateway)
         .handle(serverSocketHandler::handleMessage)
         .get();
    }

    @Bean
    public ServerSocketHandler serverSocketHandler() {
        return new ServerSocketHandler();
    }
}

我想在尝试发送答复之前让接收功能正常工作,所以这就是为什么要进行最小配置的原因。

而下面的class应该处理从服务器套接字接收到的消息

import lombok.extern.log4j.Log4j2;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.MessagingException;

@Log4j2
public class ServerSocketHandler {

    public String handleMessage(Message<?> message, MessageHeaders messageHeaders) {
        log.info(message.getPayload());
        // TODO implement something useful to process the incoming message here...
        return message.getPayload().toString();
    } 
}

上面的处理程序方法从未被调用过一次! 我已经在谷歌上搜索了一些示例实现或教程,但我还没有找到对我有用的东西。 我已经尝试了这些网站的实现:

  1. https://vispud.blogspot.com/2019/03/how-to-implement-simple-echo-socket.html
  2. https://docs.spring.io/spring-integration/docs/current/reference/html/ip.html#note-nio

还有很多网站...但对我没有任何帮助:-(

更新 1

我已经实现了自定义 serializer/deserializer:

import lombok.Data;
import lombok.extern.log4j.Log4j2;
import org.springframework.core.serializer.Deserializer;
import org.springframework.core.serializer.Serializer;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

@Log4j2
@Data
public class CustomSerializerDeserializer implements Serializer<byte[]>, 
Deserializer<byte[]> {


@Override
public byte[] deserialize(InputStream inputStream) throws IOException {
    return inputStream.readAllBytes();
}

@Override
public void serialize(byte[] object, OutputStream outputStream) throws IOException {
    outputStream.write(object);
}
}

客户端发送消息后,调用自定义序列化程序,但内容始终为空。我不知道为什么......序列化程序需要大量时间从流中读取所有字节,最后它是空的。这个过程一直在重复,所以我想我不小心建立了一个无限循环...

更新 2

我已经捕获了客户端和服务器套接字之间的通信: 看起来我卡在了握手过程中,因此没有有效负载...

因此,如果有人可以帮助我解决这个问题,我将非常感激,如果您需要更多信息,请告诉我。

提前致谢!

您如何与该服务器通信?默认情况下,连接工厂配置为要求输入由 CRLF 终止(例如 Telnet)。如果您的客户端使用其他东西来指示消息结束,则必须配置不同的反序列化器。

另外,您的方法签名不正确;应该是:

public String handleMessage(byte[] message, MessageHeaders messageHeaders) {
    String string = new String(message);
    System.out.println(string);
    return string.toUpperCase();
}

我使用 Telnet 时效果很好:

$ telnet localhost 1234
Trying ::1...
Connected to localhost.
Escape character is '^]'.
foo
FOO
^]
telnet> quit
Connection closed.

这是一个仅适用于 LF 的版本(例如 netcat):

@Bean
public IntegrationFlow server(ServerSocketHandler serverSocketHandler) {
    return IntegrationFlows.from(Tcp.inboundGateway(
            Tcp.netServer(1234)
                .deserializer(TcpCodecs.lf())
                .serializer(TcpCodecs.lf())))
            .handle(serverSocketHandler::handleMessage)
            .get();
}
$ nc localhost 1234
foo
FOO
^C

好吧,经过几天的分析和编码,我找到了使用 spring 集成处理 TCP 套接字通信的最佳解决方案。对于正在为同样的问题苦苦挣扎的其他开发人员。这是我到目前为止所做的。

这个 class 包含一个 - 对我来说 - 基于注释的 TCP 套接字连接配置

import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.ip.IpHeaders;
import org.springframework.integration.ip.tcp.TcpInboundGateway;
import org.springframework.integration.ip.tcp.TcpOutboundGateway;
import org.springframework.integration.ip.tcp.connection.AbstractClientConnectionFactory;
import org.springframework.integration.ip.tcp.connection.AbstractServerConnectionFactory;
import org.springframework.integration.ip.tcp.connection.TcpNetClientConnectionFactory;
import org.springframework.integration.ip.tcp.connection.TcpNetServerConnectionFactory;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.web.context.request.RequestContextListener;

/**
 * Spring annotation based configuration
 */
@Configuration
@EnableIntegration
@IntegrationComponentScan
public class TcpServerSocketConfiguration {

    public static final CustomSerializerDeserializer SERIALIZER = new CustomSerializerDeserializer();
    @Value("${socket.port}")
    private int socketPort;

    /**
     * Reply messages are routed to the connection only if the reply contains the ip_connectionId header
     * that was inserted into the original message by the connection factory.
     */
    @MessagingGateway(defaultRequestChannel = "toTcp")
    public interface Gateway {
        void send(String message, @Header(IpHeaders.CONNECTION_ID) String connectionId);
    }

    @Bean
    public MessageChannel fromTcp() {
        return new DirectChannel();
    }

    @Bean
    public MessageChannel toTcp() {
        return new DirectChannel();
    }

    @Bean
    public AbstractServerConnectionFactory serverCF() {
        TcpNetServerConnectionFactory serverCf = new TcpNetServerConnectionFactory(socketPort);
        serverCf.setSerializer(SERIALIZER);
        serverCf.setDeserializer(SERIALIZER);
        serverCf.setSoTcpNoDelay(true);
        serverCf.setSoKeepAlive(true);
        // serverCf.setSingleUse(true);
        // final int soTimeout = 5000;
        // serverCf.setSoTimeout(soTimeout);
        return serverCf;
    }

    @Bean
    public AbstractClientConnectionFactory clientCF() {

        TcpNetClientConnectionFactory clientCf = new TcpNetClientConnectionFactory("localhost", socketPort);
        clientCf.setSerializer(SERIALIZER);
        clientCf.setDeserializer(SERIALIZER);
        clientCf.setSoTcpNoDelay(true);
        clientCf.setSoKeepAlive(true);
        // clientCf.setSingleUse(true);
        // final int soTimeout = 5000;
        // clientCf.setSoTimeout(soTimeout);
        return clientCf;
    }

    @Bean
    public TcpInboundGateway tcpInGate() {
        TcpInboundGateway inGate = new TcpInboundGateway();
        inGate.setConnectionFactory(serverCF());
        inGate.setRequestChannel(fromTcp());
        inGate.setReplyChannel(toTcp());
        return inGate;
    }

    @Bean
    public TcpOutboundGateway tcpOutGate() {
        TcpOutboundGateway outGate = new TcpOutboundGateway();
        outGate.setConnectionFactory(clientCF());
        outGate.setReplyChannel(toTcp());
        return outGate;
    }

此 class 包含自定义序列化器和反序列化器

import lombok.extern.log4j.Log4j2;
import org.jetbrains.annotations.NotNull;
import org.springframework.core.serializer.Deserializer;
import org.springframework.core.serializer.Serializer;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

import java.nio.charset.StandardCharsets;

/**
* A custom serializer for incoming and/or outcoming messages.
*/
@Log4j2
public class CustomSerializerDeserializer implements Serializer<byte[]>, Deserializer<byte[]> {

    @NotNull
    @Override
    public byte[] deserialize(InputStream inputStream) throws IOException {
        byte[] message = new byte[0];
        if (inputStream.available() > 0) {
            message = inputStream.readAllBytes();
        }
        log.debug("Deserialized message {}", new String(message, StandardCharsets.UTF_8));
        return message;
    }

    @Override
    public void serialize(@NotNull byte[] message, OutputStream outputStream) throws IOException {
        log.info("Serializing {}", new String(message, StandardCharsets.UTF_8));
        outputStream.write(message);
        outputStream.flush();
    }
}

在接下来的 classes 中,您可以实现一些业务逻辑来处理传入...

import lombok.extern.log4j.Log4j2;
import org.springframework.integration.annotation.MessageEndpoint;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.stereotype.Component;

@Log4j2
@Component
@MessageEndpoint
public class ClientSocketHandler {

    @ServiceActivator(inputChannel = "toTcp")
    public byte[] handleMessage(byte[] msg) {
        // TODO implement some buisiness logic here
        return msg;
    }
}

和外发消息。

import lombok.extern.log4j.Log4j2;
import org.springframework.integration.annotation.MessageEndpoint;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.stereotype.Component;

@Log4j2
@Component
@MessageEndpoint
public class ClientSocketHandler {

    @ServiceActivator(inputChannel = "toTcp")
    public byte[] handleMessage(byte[] msg) {
        // implement some business logic here
        return msg;
    }
}

希望对您有所帮助。 ;-)