Spring 引导集成 - 在连接初始化时发送问候语

Spring Boot Integration - Send greet on connection initialization

我正在使用 Spring 引导集成框架编写一个简单的 TCP 服务器应用程序,它必须在连接建立时向客户端发送问候语。工作流程应该如下:

  1. 客户端连接到服务器
  2. 服务器向客户端发送问候消息"Hello client"
  3. 客户端向服务器发送消息"Any string"
  4. 服务器响应 "OK"
  5. ...

目前第 1、3 和 4 步正在运行,但我在第 2 步上失败了。 到目前为止我的代码:

@SpringBootApplication
@EnableIntegration
public class ExampleApp {

    public static void main(String[] args) {
        SpringApplication.run(ExampleApp.class, args);
    }

    // Create listener on port 1234
    @Bean
    public AbstractServerConnectionFactory serverConnectionFactory() {
        TcpNetServerConnectionFactory tcpNetServerConnectionFactory = new TcpNetServerConnectionFactory(1234);
        return tcpNetServerConnectionFactory;
    }

    // Inbound channel
    @Bean
    public MessageChannel requestChannel() {
        return new DirectChannel();
    }

    // Outbound channel
    @Bean
    public MessageChannel replyChannel() {
        return new DirectChannel();
    }

    // Inbound gateway
    @Bean
    public TcpInboundGateway tcpInboundGateway(AbstractServerConnectionFactory serverConnectionFactory, MessageChannel requestChannel, MessageChannel replyChannel) {
        TcpInboundGateway tcpInboundGateway = new TcpInboundGateway();
        tcpInboundGateway.setConnectionFactory(serverConnectionFactory);
        tcpInboundGateway.setRequestChannel(requestChannel);
        tcpInboundGateway.setReplyChannel(replyChannel);
        return tcpInboundGateway;
    }

    // Send reply for incoming message -> working
    @ServiceActivator(inputChannel = "requestChannel", outputChannel = "replyChannel")
    public Message<String> processMessage(Message<String> message) {
        Message reply = MessageBuilder
                .withPayload("OK")
                .setHeader(IpHeaders.CONNECTION_ID, message.getHeaders().get(IpHeaders.CONNECTION_ID, String.class))
                .build();
        return reply;
    }

    // Send greeting -> not working
    @Bean
    public ApplicationListener<TcpConnectionEvent> listener(MessageChannel replyChannel) {
        return tcpConnectionEvent -> {
            if (tcpConnectionEvent instanceof TcpConnectionOpenEvent) {
                Message<String> message = MessageBuilder
                        .withPayload("Hello client")
                        .setHeader(IpHeaders.CONNECTION_ID, tcpConnectionEvent.getConnectionId())
                        .build();
                replyChannel.send(message);
            }
        };
    }
}

如果我使用

连接到服务器
nc -C localhost 1234

连接已建立,但我在日志中收到以下错误:

Failed to publish TcpConnectionOpenEvent [source=TcpNetConnection:localhost:37656:1234:187cfbc2-7e5d-4f4e-97de-1a3b55a4e264], [factory=serverConnectionFactory, connectionId=localhost:37656:1234:187cfbc2-7e5d-4f4e-97de-1a3b55a4e264] OPENED:Dispatcher has no subscribers for channel 'application.replyChannel'

如果我向服务器发送一个字符串,他会按预期回复 "OK"。

我缺少什么才能让这条问候语正常工作?

解决方案

感谢 Gary Russel 的评论,我找到了解决方案。入站网关必须 "split" 进入 Inbound/Outbound 通道适配器对。这是完整的工作示例:

@SpringBootApplication
@EnableIntegration
public class ExampleApp {

    public static void main(String[] args) {
        SpringApplication.run(ExampleApp.class, args);
    }

    // Create listener on port 1234
    @Bean
    public AbstractServerConnectionFactory serverConnectionFactory() {
        TcpNetServerConnectionFactory tcpNetServerConnectionFactory = new TcpNetServerConnectionFactory(1234);
        return tcpNetServerConnectionFactory;
    }

    // Inbound channel
    @Bean
    public MessageChannel requestChannel() {
        return new DirectChannel();
    }

    // Outbound channel
    @Bean
    public MessageChannel replyChannel() {
        return new DirectChannel();
    }

    // Inbound channel adapter
    @Bean
    public TcpReceivingChannelAdapter receivingChannelAdapter(AbstractServerConnectionFactory serverConnectionFactory, MessageChannel requestChannel) {
        TcpReceivingChannelAdapter tcpReceivingChannelAdapter = new TcpReceivingChannelAdapter();
        tcpReceivingChannelAdapter.setConnectionFactory(serverConnectionFactory);
        tcpReceivingChannelAdapter.setOutputChannel(requestChannel);
        return tcpReceivingChannelAdapter;
    }

    // Outbound channel adapter
    @Bean
    @ServiceActivator(inputChannel = "replyChannel")
    public TcpSendingMessageHandler tcpSendingMessageHandler(AbstractServerConnectionFactory serverConnectionFactory) {
        TcpSendingMessageHandler tcpSendingMessageHandler = new TcpSendingMessageHandler();
        tcpSendingMessageHandler.setConnectionFactory(serverConnectionFactory);
        return tcpSendingMessageHandler;
    }

    // Send reply for incoming message -> working
    @ServiceActivator(inputChannel = "requestChannel", outputChannel = "replyChannel")
    public Message<String> processMessage(Message<String> message) {
        Message<String> reply = MessageBuilder
                .withPayload("OK")
                .setHeader(IpHeaders.CONNECTION_ID, message.getHeaders().get(IpHeaders.CONNECTION_ID, String.class))
                .build();
        return reply;
    }

    // Send greeting -> now working
    @Bean
    public ApplicationListener<TcpConnectionEvent> listener(MessageChannel replyChannel) {
        return tcpConnectionEvent -> {
            if (tcpConnectionEvent instanceof TcpConnectionOpenEvent) {
                Message<String> message = MessageBuilder
                        .withPayload("Hello client")
                        .setHeader(IpHeaders.CONNECTION_ID, tcpConnectionEvent.getConnectionId())
                        .build();
                replyChannel.send(message);
            }
        };
    }
}

现在,客户端在建立连接时收到问候 "Hello client",在每次发送消息时收到 "OK" 回复。

replyChannel.send(message);

你不能那样做;当第一个请求进入时,回复通道被连接起来。

在任何情况下,您都不能使用那样的网关,回复通道用于回复请求而不是发送一些任意消息。

您必须使用一对 inbound/outbound 通道适配器而不是网关来启用任意双向通信。