Spring 集成 TCP 入站适配器客户端断开连接问题

Spring Integration TCP Inbound Adapter client disconnect issue

我们已经为入站 TCP 服务器创建了 IntegrationFlow。每当我们对此进行测试时,集成都很好,直到客户端断开连接,然后我们会收到连续的空消息流。

Spring 集成 5.2.5.RELEASE

流量:

@Bean
public IntegrationFlow inboundFlow(MyService myService) {
   return IntegrationFlows.from(
       Tcp.inboundAdapter(
           Tcp.netServer(12345)
               .deserializer(TcpCodecs.lengthHeader4())))
       .transform(Transformers.objectToString())
       .log(INFO) // Logging only for testing 
       .handle(myService, "process") // Do some processing of the message
       .get();
}

测试方法:

public void sendPackets() throws Exception {
    List<Path> files = getAllFilesSorted("/some/location");
    try (Socket socket = new Socket("localhost", 12345)) {
        log.info("local port: {}", socket.getLocalPort());
        try (OutputStream outputStream = socket.getOutputStream()) {
            for (int i = 0; i < 10; i++) {
                Path path = files.get(i);
                log.info("{} ({} of {})", path.toString(), i, files.size());
                byte[] bytes = Files.readAllBytes(path);
                outputStream.write(bytes);
                outputStream.flush();
                Thread.sleep(200);
            }
        }
    }
}

测试完成后的部分日志:

2020-05-05 11:00:52.963  INFO 31793 --- [pool-1-thread-2] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=, headers={ip_tcp_remotePort=59407, ip_connectionId=localhost:59407:44745:df23388b-b8b0-4250-b926-4199030b2ff6, ip_localInetAddress=/127.0.0.1, ip_address=127.0.0.1, id=813ccf6b-9eb7-2ff3-0f8d-f2810966d8d7, ip_hostname=localhost, timestamp=1588672852963}]
2020-05-05 11:00:52.963  INFO 31793 --- [pool-1-thread-2] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=, headers={ip_tcp_remotePort=59407, ip_connectionId=localhost:59407:44745:df23388b-b8b0-4250-b926-4199030b2ff6, ip_localInetAddress=/127.0.0.1, ip_address=127.0.0.1, id=1fd5c4c7-b2e0-6a89-7de6-894c8e42e242, ip_hostname=localhost, timestamp=1588672852963}]
2020-05-05 11:00:52.963  INFO 31793 --- [pool-1-thread-2] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=, headers={ip_tcp_remotePort=59407, ip_connectionId=localhost:59407:44745:df23388b-b8b0-4250-b926-4199030b2ff6, ip_localInetAddress=/127.0.0.1, ip_address=127.0.0.1, id=78b852e8-0094-f8b9-8d05-37bc4912d096, ip_hostname=localhost, timestamp=1588672852963}]
2020-05-05 11:00:52.963  INFO 31793 --- [pool-1-thread-2] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=, headers={ip_tcp_remotePort=59407, ip_connectionId=localhost:59407:44745:df23388b-b8b0-4250-b926-4199030b2ff6, ip_localInetAddress=/127.0.0.1, ip_address=127.0.0.1, id=4e4a7ee0-c66e-9501-9ef7-4d25143531b3, ip_hostname=localhost, timestamp=1588672852963}]
2020-05-05 11:00:52.963  INFO 31793 --- [pool-1-thread-2] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=, headers={ip_tcp_remotePort=59407, ip_connectionId=localhost:59407:44745:df23388b-b8b0-4250-b926-4199030b2ff6, ip_localInetAddress=/127.0.0.1, ip_address=127.0.0.1, id=c90abe17-7036-94cc-af59-3b0cac5e75e4, ip_hostname=localhost, timestamp=1588672852963}]
2020-05-05 11:00:52.963  INFO 31793 --- [pool-1-thread-2] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=, headers={ip_tcp_remotePort=59407, ip_connectionId=localhost:59407:44745:df23388b-b8b0-4250-b926-4199030b2ff6, ip_localInetAddress=/127.0.0.1, ip_address=127.0.0.1, id=425d2e2a-75c5-0184-06da-6d27eb546931, ip_hostname=localhost, timestamp=1588672852963}]
2020-05-05 11:00:52.965  INFO 31793 --- [pool-1-thread-2] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=, headers={ip_tcp_remotePort=59407, ip_connectionId=localhost:59407:44745:df23388b-b8b0-4250-b926-4199030b2ff6, ip_localInetAddress=/127.0.0.1, ip_address=127.0.0.1, id=4c55d1d0-7421-1832-23ca-03744419c847, ip_hostname=localhost, timestamp=1588672852965}]
2020-05-05 11:00:52.965  INFO 31793 --- [pool-1-thread-2] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=, headers={ip_tcp_remotePort=59407, ip_connectionId=localhost:59407:44745:df23388b-b8b0-4250-b926-4199030b2ff6, ip_localInetAddress=/127.0.0.1, ip_address=127.0.0.1, id=05b6fea6-8f76-5c74-e4cc-33bd8099d8e8, ip_hostname=localhost, timestamp=1588672852965}]
2020-05-05 11:00:52.965  INFO 31793 --- [pool-1-thread-2] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=, headers={ip_tcp_remotePort=59407, ip_connectionId=localhost:59407:44745:df23388b-b8b0-4250-b926-4199030b2ff6, ip_localInetAddress=/127.0.0.1, ip_address=127.0.0.1, id=9b8fe16c-07f4-b64b-faec-794eb743559c, ip_hostname=localhost, timestamp=1588672852965}]
2020-05-05 11:00:52.965  INFO 31793 --- [pool-1-thread-2] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=, headers={ip_tcp_remotePort=59407, ip_connectionId=localhost:59407:44745:df23388b-b8b0-4250-b926-4199030b2ff6, ip_localInetAddress=/127.0.0.1, ip_address=127.0.0.1, id=222725e8-19c6-62b6-8ba7-103892191c96, ip_hostname=localhost, timestamp=1588672852965}]
2020-05-05 11:00:52.965  INFO 31793 --- [pool-1-thread-2] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=, headers={ip_tcp_remotePort=59407, ip_connectionId=localhost:59407:44745:df23388b-b8b0-4250-b926-4199030b2ff6, ip_localInetAddress=/127.0.0.1, ip_address=127.0.0.1, id=951403b6-18a9-ffe9-29b7-67b8c01ea311, ip_hostname=localhost, timestamp=1588672852965}]
2020-05-05 11:00:52.965  INFO 31793 --- [pool-1-thread-2] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=, headers={ip_tcp_remotePort=59407, ip_connectionId=localhost:59407:44745:df23388b-b8b0-4250-b926-4199030b2ff6, ip_localInetAddress=/127.0.0.1, ip_address=127.0.0.1, id=2ac3002b-ccd1-9506-f3aa-60f4b63e26b8, ip_hostname=localhost, timestamp=1588672852965}]
2020-05-05 11:00:52.965  INFO 31793 --- [pool-1-thread-2] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=, headers={ip_tcp_remotePort=59407, ip_connectionId=localhost:59407:44745:df23388b-b8b0-4250-b926-4199030b2ff6, ip_localInetAddress=/127.0.0.1, ip_address=127.0.0.1, id=dbbf12cf-0a27-407b-0a96-5b27ab0539db, ip_hostname=localhost, timestamp=1588672852965}]
2020-05-05 11:00:52.965  INFO 31793 --- [pool-1-thread-2] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=, headers={ip_tcp_remotePort=59407, ip_connectionId=localhost:59407:44745:df23388b-b8b0-4250-b926-4199030b2ff6, ip_localInetAddress=/127.0.0.1, ip_address=127.0.0.1, id=3282d09e-9508-b053-3763-cc23d3c817a8, ip_hostname=localhost, timestamp=1588672852965}]
2020-05-05 11:00:52.966  INFO 31793 --- [pool-1-thread-2] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=, headers={ip_tcp_remotePort=59407, ip_connectionId=localhost:59407:44745:df23388b-b8b0-4250-b926-4199030b2ff6, ip_localInetAddress=/127.0.0.1, ip_address=127.0.0.1, id=549b2326-341f-19c0-db98-6b31da1901d8, ip_hostname=localhost, timestamp=1588672852966}]

日志中没有关于客户端断开连接的内容。我们会在日志中看到套接字关闭 activity。

日志表明我们正在接收包含 0x00000000.

的数据包

使用wireshark或类似工具查看activity线上

编辑

您的自定义解串器导致了问题。

        if (read < 0) {
            return 0;
        }

当你检测到套接字关闭时,你需要抛出一个SoftEndOfStreamException

这在 the documentation 中有解释。

When the deserializer detects a closed input stream between messages, it must throw a SoftEndOfStreamException; this is a signal to the framework to indicate that the close was "normal". If the stream is closed while decoding a message, some other exception should be thrown instead.